diff --git a/src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs b/src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs index ef2bff7e6cfd..04a7ac3c8b70 100644 --- a/src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs +++ b/src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs @@ -61,6 +61,7 @@ internal sealed partial class Win32FileStream : FileStreamBase private long _appendStart;// When appending, prevent overwriting file. private Task _lastSynchronouslyCompletedTask = null; + private Task _activeBufferOperation = null; [System.Security.SecuritySafeCritical] public Win32FileStream(String path, FileMode mode, FileAccess access, FileShare share, int bufferSize, FileOptions options, FileStream parent) : base(parent) @@ -331,6 +332,10 @@ private unsafe void VerifyHandleIsSync() } } + private bool HasActiveBufferOperation + { + get { return _activeBufferOperation != null && !_activeBufferOperation.IsCompleted; } + } public override bool CanRead { @@ -520,6 +525,26 @@ private void FlushRead() _readLen = 0; } + // Returns a task that flushes the internal write buffer + private Task FlushWriteAsync(CancellationToken cancellationToken) + { + Debug.Assert(_isAsync); + Debug.Assert(_readPos == 0 && _readLen == 0, "FileStream: Read buffer must be empty in FlushWriteAsync!"); + + // If the buffer is already flushed, don't spin up the OS write + if (_writePos == 0) return Task.CompletedTask; + + Task flushTask = WriteInternalCoreAsync(_buffer, 0, _writePos, cancellationToken); + _writePos = 0; + + // Update the active buffer operation + _activeBufferOperation = HasActiveBufferOperation ? + Task.WhenAll(_activeBufferOperation, flushTask) : + flushTask; + + return flushTask; + } + // Writes are buffered. Anytime the buffer fills up // (_writePos + delta > _bufferSize) or the buffer switches to reading // and there is left over data (_writePos > 0), this function must be called. @@ -529,7 +554,7 @@ private void FlushWrite(bool calledFromFinalizer) if (_isAsync) { - Task writeTask = WriteInternalCoreAsync(_buffer, 0, _writePos, CancellationToken.None); + Task writeTask = FlushWriteAsync(CancellationToken.None); // With our Whidbey async IO & overlapped support for AD unloads, // we don't strictly need to block here to release resources // since that support takes care of the pinning & freeing the @@ -1259,52 +1284,107 @@ private Task WriteInternalAsync(byte[] array, int offset, int numBytes, Cancella if (!_parent.CanWrite) throw __Error.GetWriteNotSupported(); Debug.Assert((_readPos == 0 && _readLen == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLen), "We're either reading or writing, but not both."); + Debug.Assert(!_isPipe || (_readPos == 0 && _readLen == 0), "Win32FileStream must not have buffered data here! Pipes should be unidirectional."); - if (_isPipe) + bool writeDataStoredInBuffer = false; + if (!_isPipe) // avoid async buffering with pipes, as doing so can lead to deadlocks (see comments in ReadInternalAsyncCore) { - // Pipes are tricky, at least when you have 2 different pipes - // that you want to use simultaneously. When redirecting stdout - // & stderr with the Process class, it's easy to deadlock your - // parent & child processes when doing writes 4K at a time. The - // OS appears to use a 4K buffer internally. If you write to a - // pipe that is full, you will block until someone read from - // that pipe. If you try reading from an empty pipe and - // Win32FileStream's ReadAsync blocks waiting for data to fill it's - // internal buffer, you will be blocked. In a case where a child - // process writes to stdout & stderr while a parent process tries - // reading from both, you can easily get into a deadlock here. - // To avoid this deadlock, don't buffer when doing async IO on - // pipes. - Debug.Assert(_readPos == 0 && _readLen == 0, "Win32FileStream must not have buffered data here! Pipes should be unidirectional."); + // Ensure the buffer is clear for writing + if (_writePos == 0) + { + if (_readPos < _readLen) + { + FlushRead(); + } + _readPos = 0; + _readLen = 0; + } - if (_writePos > 0) - FlushWrite(false); + // Determine how much space remains in the buffer + int remainingBuffer = _bufferSize - _writePos; + Debug.Assert(remainingBuffer >= 0); - return WriteInternalCoreAsync(array, offset, numBytes, cancellationToken); - } + // Simple/common case: + // - The write is smaller than our buffer, such that it's worth considering buffering it. + // - There's no active flush operation, such that we don't have to worry about the existing buffer being in use. + // - And the data we're trying to write fits in the buffer, meaning it wasn't already filled by previous writes. + // In that case, just store it in the buffer. + if (numBytes < _bufferSize && !HasActiveBufferOperation && numBytes <= remainingBuffer) + { + if (_buffer == null) + _buffer = new byte[_bufferSize]; - // Handle buffering. - if (_writePos == 0) - { - if (_readPos < _readLen) FlushRead(); - _readPos = 0; - _readLen = 0; + Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes); + _writePos += numBytes; + writeDataStoredInBuffer = true; + + // There is one special-but-common case, common because devs often use + // byte[] sizes that are powers of 2 and thus fit nicely into our buffer, which is + // also a power of 2. If after our write the buffer still has remaining space, + // then we're done and can return a completed task now. But if we filled the buffer + // completely, we want to do the asynchronous flush/write as part of this operation + // rather than waiting until the next write that fills the buffer. + if (numBytes != remainingBuffer) + return Task.CompletedTask; + + Debug.Assert(_writePos == _bufferSize); + } } - int n = _bufferSize - _writePos; - if (numBytes <= n) + // At this point, at least one of the following is true: + // 1. There was an active flush operation (it could have completed by now, though). + // 2. The data doesn't fit in the remaining buffer (or it's a pipe and we chose not to try). + // 3. We wrote all of the data to the buffer, filling it. + // + // If there's an active operation, we can't touch the current buffer because it's in use. + // That gives us a choice: we can either allocate a new buffer, or we can skip the buffer + // entirely (even if the data would otherwise fit in it). For now, for simplicity, we do + // the latter; it could also have performance wins due to OS-level optimizations, and we could + // potentially add support for PreAllocatedOverlapped due to having a single buffer. (We can + // switch to allocating a new buffer, potentially experimenting with buffer pooling, should + // performance data suggest it's appropriate.) + // + // If the data doesn't fit in the remaining buffer, it could be because it's so large + // it's greater than the entire buffer size, in which case we'd always skip the buffer, + // or it could be because there's more data than just the space remaining. For the latter + // case, we need to issue an asynchronous write to flush that data, which then turns this into + // the first case above with an active operation. + // + // If we already stored the data, then we have nothing additional to write beyond what + // we need to flush. + // + // In any of these cases, we have the same outcome: + // - If there's data in the buffer, flush it by writing it out asynchronously. + // - Then, if there's any data to be written, issue a write for it concurrently. + // We return a Task that represents one or both. + + // Flush the buffer asynchronously if there's anything to flush + Task flushTask = null; + if (_writePos > 0) { - if (_writePos == 0) _buffer = new byte[_bufferSize]; - Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes); - _writePos += numBytes; - - return Task.CompletedTask; + flushTask = FlushWriteAsync(cancellationToken); + + // If we already copied all of the data into the buffer, + // simply return the flush task here. Same goes for if the task has + // already completed and was unsuccessful. + if (writeDataStoredInBuffer || + flushTask.IsFaulted || + flushTask.IsCanceled) + { + return flushTask; + } } - if (_writePos > 0) - FlushWrite(false); + Debug.Assert(!writeDataStoredInBuffer); + Debug.Assert(_writePos == 0); - return WriteInternalCoreAsync(array, offset, numBytes, cancellationToken); + // Finally, issue the write asynchronously, and return a Task that logically + // represents the write operation, including any flushing done. + Task writeTask = WriteInternalCoreAsync(array, offset, numBytes, cancellationToken); + return + (flushTask == null || flushTask.Status == TaskStatus.RanToCompletion) ? writeTask : + (writeTask.Status == TaskStatus.RanToCompletion) ? flushTask : + Task.WhenAll(flushTask, writeTask); } [System.Security.SecuritySafeCritical] // auto-generated @@ -1395,7 +1475,7 @@ private unsafe Task WriteInternalCoreAsync(byte[] bytes, int offset, int numByte throw Win32Marshal.GetExceptionForWin32Error(errorCode); } } - else + else // ERROR_IO_PENDING { // Only once the IO is pending do we register for cancellation completionSource.RegisterForCancellation(); diff --git a/src/System.IO.FileSystem/tests/FileStream/Flush.Sharing.cs b/src/System.IO.FileSystem/tests/FileStream/Flush.Sharing.cs deleted file mode 100644 index b83c3316103e..000000000000 --- a/src/System.IO.FileSystem/tests/FileStream/Flush.Sharing.cs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System; -using System.IO; -using Xunit; - -namespace System.IO.FileSystem.Tests -{ - partial class FileStream_Flush - { - // Requires concurrent readers/writers which is not supported on all platforms - [Fact] - public void FlushWriteWithOtherClient() - { - string fileName = GetTestFilePath(); - - // ensure that we'll be using a buffer larger than our test data - using (FileStream fs = new FileStream(fileName, FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite, TestBuffer.Length * 2)) - using (FileStream fsr = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) - { - fs.Write(TestBuffer, 0, TestBuffer.Length); - Assert.Equal(TestBuffer.Length, fs.Length); - - // Make sure that we've actually buffered it, read handle won't see any changes - Assert.Equal(0, fsr.Length); - - // This should cause a write, after it completes the two handles should be in sync - fs.Flush(); - Assert.Equal(TestBuffer.Length, fsr.Length); - - byte[] buffer = new byte[TestBuffer.Length]; - fsr.Read(buffer, 0, buffer.Length); - Assert.Equal(TestBuffer, buffer); - } - } - - } -} diff --git a/src/System.IO.FileSystem/tests/FileStream/Flush.cs b/src/System.IO.FileSystem/tests/FileStream/Flush.cs index 8479e17ba079..226796fd449a 100644 --- a/src/System.IO.FileSystem/tests/FileStream/Flush.cs +++ b/src/System.IO.FileSystem/tests/FileStream/Flush.cs @@ -1,47 +1,156 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System; -using System.IO; using Xunit; namespace System.IO.FileSystem.Tests { public partial class FileStream_Flush : FileSystemTest { - [Fact] - public void FlushThrowsForDisposedStream() + [Theory] + [InlineData(null)] + [InlineData(false)] + [InlineData(true)] + public void FlushThrowsForDisposedStream(bool? flushToDisk) { using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) { fs.Dispose(); - Assert.Throws(() => fs.Flush()); + Assert.Throws(() => Flush(fs, flushToDisk)); } } - [Fact] - public void BasicFlushFunctionality() + [Theory] + [InlineData(null)] + [InlineData(false)] + [InlineData(true)] + public void BasicFlushFunctionality(bool? flushToDisk) { using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) { fs.WriteByte(0); - fs.Flush(); + Flush(fs, flushToDisk); + + fs.WriteByte(0xFF); + Flush(fs, flushToDisk); } } - [Fact] - public void FlushOnReadOnlyFileDoesNotThrow() + [Theory] + [InlineData(null)] + [InlineData(false)] + [InlineData(true)] + public void FlushWhenNothingToFlush(bool? flushToDisk) { - string fileName = GetTestFilePath(); - using (FileStream fs = new FileStream(fileName, FileMode.Create)) + using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) { fs.WriteByte(0); + Flush(fs, flushToDisk); + + Flush(fs, flushToDisk); + Flush(fs, flushToDisk); + Flush(fs, flushToDisk); } + } - using (FileStream fs = new FileStream(fileName, FileMode.Open)) + [Theory] + [InlineData(null)] + [InlineData(false)] + [InlineData(true)] + public void FlushOnReadOnlyStreamDoesNotThrow(bool? flushToDisk) + { + string fileName = GetTestFilePath(); + File.WriteAllBytes(fileName, new byte[] { 0 }); + File.SetAttributes(fileName, FileAttributes.ReadOnly); + try + { + using (FileStream fs = new FileStream(fileName, FileMode.Open, FileAccess.Read)) + { + Flush(fs, flushToDisk); + } + } + finally { + File.SetAttributes(fileName, FileAttributes.Normal); + } + } + + [Theory] + [InlineData(null)] + [InlineData(false)] + [InlineData(true)] + public void FlushAfterReading(bool? flushToDisk) + { + string fileName = GetTestFilePath(); + File.WriteAllBytes(fileName, TestBuffer); + using (FileStream fs = new FileStream(fileName, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite, 2)) + { + Assert.Equal(TestBuffer[0], fs.ReadByte()); + Flush(fs, flushToDisk); + } + } + + [Theory] + [InlineData(null)] + [InlineData(false)] + [InlineData(true)] + public void FlushWriteWithOtherClient(bool? flushToDisk) + { + string fileName = GetTestFilePath(); + + // ensure that we'll be using a buffer larger than our test data + using (FileStream fs = new FileStream(fileName, FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite, TestBuffer.Length * 2)) + using (FileStream fsr = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + fs.Write(TestBuffer, 0, TestBuffer.Length); + Assert.Equal(TestBuffer.Length, fs.Length); + + // Make sure that we've actually buffered it, read handle won't see any changes + Assert.Equal(0, fsr.Length); + + // This should cause a write, after it completes the two handles should be in sync + Flush(fs, flushToDisk); + Assert.Equal(TestBuffer.Length, fsr.Length); + + byte[] buffer = new byte[TestBuffer.Length]; + fsr.Read(buffer, 0, buffer.Length); + Assert.Equal(TestBuffer, buffer); + } + } + + [Fact] + public void FlushCallsFlush_flushToDisk_False() + { + using (StoreFlushArgFileStream fs = new StoreFlushArgFileStream(GetTestFilePath(), FileMode.Create)) + { + fs.Flush(); + Assert.True(fs.LastFlushArg.HasValue); + Assert.False(fs.LastFlushArg.Value); + } + } + + private static void Flush(FileStream fs, bool? flushArg) + { + if (!flushArg.HasValue) fs.Flush(); + else + fs.Flush(flushArg.Value); + } + + private sealed class StoreFlushArgFileStream : FileStream + { + public StoreFlushArgFileStream(string path, FileMode mode) : base(path, mode) + { + } + + public bool? LastFlushArg; + + public override void Flush(bool flushToDisk) + { + LastFlushArg = flushToDisk; + base.Flush(flushToDisk); } } + } } diff --git a/src/System.IO.FileSystem/tests/FileStream/FlushAsync.cs b/src/System.IO.FileSystem/tests/FileStream/FlushAsync.cs new file mode 100644 index 000000000000..1a61a46218e7 --- /dev/null +++ b/src/System.IO.FileSystem/tests/FileStream/FlushAsync.cs @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.IO.FileSystem.Tests +{ + public partial class FileStream_FlushAsync : FileSystemTest + { + [Fact] + public async Task FlushAsyncThrowsForDisposedStream() + { + using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) + { + fs.Dispose(); + await Assert.ThrowsAsync(() => fs.FlushAsync()); + } + } + + [Fact] + public async Task BasicFlushAsyncFunctionality() + { + using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) + { + fs.WriteByte(0); + await fs.FlushAsync(); + + fs.WriteByte(0xFF); + await fs.FlushAsync(); + } + } + + [Fact] + public async Task FlushAsyncWhenNothingToFlush() + { + using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) + { + fs.WriteByte(0); + await fs.FlushAsync(); + + await fs.FlushAsync(); + await fs.FlushAsync(); + await fs.FlushAsync(); + } + } + + [Fact] + public async Task FlushAsyncOnReadOnlyFileDoesNotThrow() + { + string fileName = GetTestFilePath(); + File.WriteAllBytes(fileName, new byte[] { 0 }); + File.SetAttributes(fileName, FileAttributes.ReadOnly); + try + { + using (FileStream fs = new FileStream(fileName, FileMode.Open, FileAccess.Read)) + { + await fs.FlushAsync(); + } + } + finally + { + File.SetAttributes(fileName, FileAttributes.Normal); + } + } + + [Theory] + [InlineData(null)] + [InlineData(false)] + [InlineData(true)] + public async Task FlushAfterReading(bool? flushToDisk) + { + string fileName = GetTestFilePath(); + File.WriteAllBytes(fileName, TestBuffer); + using (FileStream fs = new FileStream(fileName, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite, 2)) + { + Assert.Equal(TestBuffer[0], fs.ReadByte()); + await fs.FlushAsync(); + } + } + + [Fact] + public async Task FlushAsyncWriteWithOtherClient() + { + string fileName = GetTestFilePath(); + + // ensure that we'll be using a buffer larger than our test data + using (FileStream fs = new FileStream(fileName, FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite, TestBuffer.Length * 2)) + using (FileStream fsr = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + fs.Write(TestBuffer, 0, TestBuffer.Length); + Assert.Equal(TestBuffer.Length, fs.Length); + + // Make sure that we've actually buffered it, read handle won't see any changes + Assert.Equal(0, fsr.Length); + + // This should cause a write, after it completes the two handles should be in sync + await fs.FlushAsync(); + Assert.Equal(TestBuffer.Length, fsr.Length); + + byte[] buffer = new byte[TestBuffer.Length]; + fsr.Read(buffer, 0, buffer.Length); + Assert.Equal(TestBuffer, buffer); + } + } + + [Fact] + public void FlushAsyncWithCanceledToken() + { + using (FileStream fs = File.OpenWrite(GetTestFilePath())) + { + Assert.True(fs.FlushAsync(new CancellationToken(true)).IsCanceled); + } + } + + } +} diff --git a/src/System.IO.FileSystem/tests/FileStream/Flush_toDisk.cs b/src/System.IO.FileSystem/tests/FileStream/Flush_toDisk.cs deleted file mode 100644 index d8e868a28bce..000000000000 --- a/src/System.IO.FileSystem/tests/FileStream/Flush_toDisk.cs +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System; -using System.IO; -using Xunit; - -namespace System.IO.FileSystem.Tests -{ - public class FileStream_Flush_toDisk : FileSystemTest - { - [Fact] - public void FlushThrowsForDisposedStream() - { - using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) - { - fs.Dispose(); - Assert.Throws(() => fs.Flush(false)); - Assert.Throws(() => fs.Flush(true)); - } - } - - [Fact] - public void BasicFlushFunctionality() - { - using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) - { - fs.WriteByte(0); - fs.Flush(false); - - fs.WriteByte(0xFF); - fs.Flush(true); - } - } - - [Fact] - public void FlushOnReadOnlyFileDoesNotThrow() - { - string fileName = GetTestFilePath(); - using (FileStream fs = new FileStream(fileName, FileMode.Create)) - { - fs.WriteByte(0); - } - - using (FileStream fs = new FileStream(fileName, FileMode.Open)) - { - fs.Flush(false); - fs.Flush(true); - } - } - - private class MyFileStream : FileStream - { - public MyFileStream(string path, FileMode mode) - : base(path, mode) - { } - - public Action FlushToDiskMethod { get; set; } - - public override void Flush(bool flushToDisk) - { - if (null != FlushToDiskMethod) - { - FlushToDiskMethod(flushToDisk); - } - - base.Flush(flushToDisk); - } - } - - [Fact] - public void FlushCallsFlush_toDisk_false() - { - bool called = false; - - using (MyFileStream fs = new MyFileStream(GetTestFilePath(), FileMode.Create)) - { - fs.FlushToDiskMethod = (flushToDisk) => - { - Assert.False(flushToDisk); - called = true; - }; - fs.Flush(); - Assert.True(called); - } - } - } -} diff --git a/src/System.IO.FileSystem/tests/FileStream/Pipes.cs b/src/System.IO.FileSystem/tests/FileStream/Pipes.cs new file mode 100644 index 000000000000..e1a14f8793fe --- /dev/null +++ b/src/System.IO.FileSystem/tests/FileStream/Pipes.cs @@ -0,0 +1,154 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Microsoft.Win32.SafeHandles; +using System.IO.Pipes; +using System.Runtime.InteropServices; +using System.Threading.Tasks; +using Xunit; + +namespace System.IO.FileSystem.Tests +{ + public class Pipes : FileSystemTest + { + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task AnonymousPipeWriteViaFileStream(bool asyncWrites) + { + using (var server = new AnonymousPipeServerStream(PipeDirection.In)) + { + Task serverTask = Task.Run(() => + { + for (int i = 0; i < 6; i++) + Assert.Equal(i, server.ReadByte()); + }); + + using (var client = new FileStream(new SafeFileHandle(server.ClientSafePipeHandle.DangerousGetHandle(), false), FileAccess.Write, bufferSize: 3)) + { + var data = new[] { new byte[] { 0, 1 }, new byte[] { 2, 3 }, new byte[] { 4, 5 } }; + foreach (byte[] arr in data) + { + if (asyncWrites) + await client.WriteAsync(arr, 0, arr.Length); + else + client.Write(arr, 0, arr.Length); + } + } + + await serverTask; + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task AnonymousPipeReadViaFileStream(bool asyncReads) + { + using (var server = new AnonymousPipeServerStream(PipeDirection.Out)) + { + Task serverTask = server.WriteAsync(new byte[] { 0, 1, 2, 3, 4, 5 }, 0, 6); + + using (var client = new FileStream(new SafeFileHandle(server.ClientSafePipeHandle.DangerousGetHandle(), false), FileAccess.Read, bufferSize: 3)) + { + var arr = new byte[1]; + for (int i = 0; i < 6; i++) + { + Assert.Equal(1, asyncReads ? + await client.ReadAsync(arr, 0, 1) : + client.Read(arr, 0, 1)); + Assert.Equal(i, arr[0]); + } + } + + await serverTask; + } + } + + [PlatformSpecific(PlatformID.Windows)] // Uses P/Invokes to create async pipe handle + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task NamedPipeWriteViaAsyncFileStream(bool asyncWrites) + { + string name = Guid.NewGuid().ToString("N"); + using (var server = new NamedPipeServerStream(name, PipeDirection.In, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous)) + { + Task serverTask = Task.Run(async () => + { + await server.WaitForConnectionAsync(); + for (int i = 0; i < 6; i++) + Assert.Equal(i, server.ReadByte()); + }); + + WaitNamedPipeW(@"\\.\pipe\" + name, -1); + using (SafeFileHandle clientHandle = CreateFileW(@"\\.\pipe\" + name, GENERIC_WRITE, FileShare.None, IntPtr.Zero, FileMode.Open, (int)PipeOptions.Asynchronous, IntPtr.Zero)) + using (var client = new FileStream(clientHandle, FileAccess.Write, bufferSize: 3, isAsync: true)) + { + var data = new[] { new byte[] { 0, 1 }, new byte[] { 2, 3 }, new byte[] { 4, 5 } }; + foreach (byte[] arr in data) + { + if (asyncWrites) + await client.WriteAsync(arr, 0, arr.Length); + else + client.Write(arr, 0, arr.Length); + } + } + + await serverTask; + } + } + + [PlatformSpecific(PlatformID.Windows)] // Uses P/Invokes to create async pipe handle + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task NamedPipeReadViaAsyncFileStream(bool asyncReads) + { + string name = Guid.NewGuid().ToString("N"); + using (var server = new NamedPipeServerStream(name, PipeDirection.Out, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous)) + { + Task serverTask = Task.Run(async () => + { + await server.WaitForConnectionAsync(); + await server.WriteAsync(new byte[] { 0, 1, 2, 3, 4, 5 }, 0, 6); + }); + + WaitNamedPipeW(@"\\.\pipe\" + name, -1); + using (SafeFileHandle clientHandle = CreateFileW(@"\\.\pipe\" + name, GENERIC_READ, FileShare.None, IntPtr.Zero, FileMode.Open, (int)PipeOptions.Asynchronous, IntPtr.Zero)) + using (var client = new FileStream(clientHandle, FileAccess.Read, bufferSize: 3, isAsync: true)) + { + var arr = new byte[1]; + for (int i = 0; i < 6; i++) + { + Assert.Equal(1, asyncReads ? + await client.ReadAsync(arr, 0, 1) : + client.Read(arr, 0, 1)); + Assert.Equal(i, arr[0]); + } + } + + await serverTask; + } + } + + #region Windows P/Invokes + // We need to P/Invoke to test the named pipe async behavior with FileStream + // because NamedPipeClientStream internally binds the created handle, + // and that then prevents FileStream's constructor from working with the handle + // when trying to set isAsync to true. + + [DllImport("kernel32.dll", CharSet = CharSet.Unicode, SetLastError = true)] + [return: MarshalAs(UnmanagedType.Bool)] + public static extern bool WaitNamedPipeW(string name, int timeout); + + [DllImport("kernel32.dll", CharSet = CharSet.Unicode, SetLastError = true)] + internal static extern SafeFileHandle CreateFileW( + string lpFileName, int dwDesiredAccess, FileShare dwShareMode, + IntPtr securityAttrs, FileMode dwCreationDisposition, int dwFlagsAndAttributes, IntPtr hTemplateFile); + + internal const int GENERIC_READ = unchecked((int)0x80000000); + internal const int GENERIC_WRITE = 0x40000000; + #endregion + } +} diff --git a/src/System.IO.FileSystem/tests/FileStream/WriteAsync.cs b/src/System.IO.FileSystem/tests/FileStream/WriteAsync.cs index 7fcf8b6b8a87..789841e7f8a1 100644 --- a/src/System.IO.FileSystem/tests/FileStream/WriteAsync.cs +++ b/src/System.IO.FileSystem/tests/FileStream/WriteAsync.cs @@ -2,8 +2,10 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Collections.Generic; using System.IO; using System.Linq; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -270,6 +272,160 @@ public async Task WriteAsyncCancelledFile() } } + [Fact] + public async void WriteAsyncInternalBufferOverflow() + { + using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create, FileAccess.Write, FileShare.None, 3, useAsync: true)) + { + // Fill buffer; should trigger flush of full buffer, no additional I/O + await fs.WriteAsync(TestBuffer, 0, 3); + Assert.True(fs.Length == 3); + + // Add to next buffer + await fs.WriteAsync(TestBuffer, 0, 1); + Assert.True(fs.Length == 4); + + // Complete that buffer; should trigger flush of full buffer, no additional I/O + await fs.WriteAsync(TestBuffer, 0, 2); + Assert.True(fs.Length == 6); + + // Add to next buffer + await fs.WriteAsync(TestBuffer, 0, 2); + Assert.True(fs.Length == 8); + + // Overflow buffer with amount that could fit in a buffer; should trigger a flush, with additional I/O + await fs.WriteAsync(TestBuffer, 0, 2); + Assert.True(fs.Length == 10); + + // Overflow buffer with amount that couldn't fit in a buffer; shouldn't be anything to flush, just an additional I/O + await fs.WriteAsync(TestBuffer, 0, 4); + Assert.True(fs.Length == 14); + } + } + + public static IEnumerable MemberData_FileStreamAsyncWriting() + { + foreach (bool useAsync in new[] { true, false }) + { + if (useAsync && !RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + // [ActiveIssue(812, PlatformID.AnyUnix)] + // We don't have a special async I/O implementation in FileStream on Unix. + continue; + } + + foreach (bool preSize in new[] { true, false }) + { + foreach (bool cancelable in new[] { true, false }) + { + yield return new object[] { useAsync, preSize, false, cancelable, 0x1000, 0x100, 100 }; + yield return new object[] { useAsync, preSize, false, cancelable, 0x1, 0x1, 1000 }; + yield return new object[] { useAsync, preSize, true, cancelable, 0x2, 0x100, 100 }; + yield return new object[] { useAsync, preSize, false, cancelable, 0x4000, 0x10, 100 }; + yield return new object[] { useAsync, preSize, true, cancelable, 0x1000, 99999, 10 }; + } + } + } + } + + [Theory] + [MemberData("MemberData_FileStreamAsyncWriting")] + public async Task ManyConcurrentWriteAsyncs( + bool useAsync, bool presize, bool exposeHandle, bool cancelable, int bufferSize, int writeSize, int numWrites) + { + long totalLength = writeSize * numWrites; + var expectedData = new byte[totalLength]; + new Random(42).NextBytes(expectedData); + CancellationToken cancellationToken = cancelable ? new CancellationTokenSource().Token : CancellationToken.None; + + string path = GetTestFilePath(); + using (FileStream fs = new FileStream(path, FileMode.Create, FileAccess.ReadWrite, FileShare.None, bufferSize, useAsync)) + { + if (presize) + { + fs.SetLength(totalLength); + } + if (exposeHandle) + { + var ignored = fs.SafeFileHandle; + } + + Task[] writes = new Task[numWrites]; + for (int i = 0; i < numWrites; i++) + { + writes[i] = fs.WriteAsync(expectedData, i * writeSize, writeSize, cancellationToken); + Assert.Null(writes[i].Exception); + if (useAsync) + { + Assert.Equal((i + 1) * writeSize, fs.Position); + } + } + + await Task.WhenAll(writes); + } + + byte[] actualData = File.ReadAllBytes(path); + Assert.Equal(expectedData.Length, actualData.Length); + Assert.Equal(expectedData, actualData); + } + + [Theory] + [MemberData("MemberData_FileStreamAsyncWriting")] + public async Task CopyToAsyncBetweenFileStreams( + bool useAsync, bool preSize, bool exposeHandle, bool cancelable, int bufferSize, int writeSize, int numWrites) + { + long totalLength = writeSize * numWrites; + var expectedData = new byte[totalLength]; + new Random(42).NextBytes(expectedData); + + string srcPath = GetTestFilePath(); + File.WriteAllBytes(srcPath, expectedData); + + string dstPath = GetTestFilePath(); + using (FileStream src = new FileStream(srcPath, FileMode.Open, FileAccess.Read, FileShare.None, bufferSize, useAsync)) + using (FileStream dst = new FileStream(dstPath, FileMode.Create, FileAccess.Write, FileShare.None, bufferSize, useAsync)) + { + await src.CopyToAsync(dst, writeSize, cancelable ? new CancellationTokenSource().Token : CancellationToken.None); + } + + byte[] actualData = File.ReadAllBytes(dstPath); + Assert.Equal(expectedData.Length, actualData.Length); + Assert.Equal(expectedData, actualData); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task BufferCorrectlyMaintaindWhenReadAndWrite(bool useAsync) + { + string path = GetTestFilePath(); + File.WriteAllBytes(path, TestBuffer); + + using (FileStream fs = new FileStream(path, FileMode.Open, FileAccess.ReadWrite, FileShare.None, 2, useAsync)) + { + Assert.Equal(TestBuffer[0], await ReadByteAsync(fs)); + Assert.Equal(TestBuffer[1], await ReadByteAsync(fs)); + Assert.Equal(TestBuffer[2], await ReadByteAsync(fs)); + await fs.WriteAsync(TestBuffer, 0, TestBuffer.Length); + + fs.Position = 0; + Assert.Equal(TestBuffer[0], await ReadByteAsync(fs)); + Assert.Equal(TestBuffer[1], await ReadByteAsync(fs)); + Assert.Equal(TestBuffer[2], await ReadByteAsync(fs)); + for (int i = 0; i < TestBuffer.Length; i++) + { + Assert.Equal(TestBuffer[i], await ReadByteAsync(fs)); + } + } + } + + private static async Task ReadByteAsync(FileStream fs) + { + byte[] oneByte = new byte[1]; + Assert.Equal(1, await fs.ReadAsync(oneByte, 0, 1)); + return oneByte[0]; + } + [Fact, OuterLoop] public async Task WriteAsyncMiniStress() { diff --git a/src/System.IO.FileSystem/tests/System.IO.FileSystem.Tests.csproj b/src/System.IO.FileSystem/tests/System.IO.FileSystem.Tests.csproj index f70b4c1e1dab..e28cb4c77fc3 100644 --- a/src/System.IO.FileSystem/tests/System.IO.FileSystem.Tests.csproj +++ b/src/System.IO.FileSystem/tests/System.IO.FileSystem.Tests.csproj @@ -63,8 +63,8 @@ - - + +