Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 117 additions & 37 deletions src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ internal sealed partial class Win32FileStream : FileStreamBase
private long _appendStart;// When appending, prevent overwriting file.

private Task<int> _lastSynchronouslyCompletedTask = null;
private Task _activeBufferOperation = null;

[System.Security.SecuritySafeCritical]
public Win32FileStream(String path, FileMode mode, FileAccess access, FileShare share, int bufferSize, FileOptions options, FileStream parent) : base(parent)
Expand Down Expand Up @@ -331,6 +332,10 @@ private unsafe void VerifyHandleIsSync()
}
}

private bool HasActiveBufferOperation
{
get { return _activeBufferOperation != null && !_activeBufferOperation.IsCompleted; }
}

public override bool CanRead
{
Expand Down Expand Up @@ -520,6 +525,26 @@ private void FlushRead()
_readLen = 0;
}

// Returns a task that flushes the internal write buffer
private Task FlushWriteAsync(CancellationToken cancellationToken)
{
Debug.Assert(_isAsync);
Debug.Assert(_readPos == 0 && _readLen == 0, "FileStream: Read buffer must be empty in FlushWriteAsync!");

// If the buffer is already flushed, don't spin up the OS write
if (_writePos == 0) return Task.CompletedTask;

Task flushTask = WriteInternalCoreAsync(_buffer, 0, _writePos, cancellationToken);
_writePos = 0;

// Update the active buffer operation
_activeBufferOperation = HasActiveBufferOperation ?
Task.WhenAll(_activeBufferOperation, flushTask) :
flushTask;

return flushTask;
}

// Writes are buffered. Anytime the buffer fills up
// (_writePos + delta > _bufferSize) or the buffer switches to reading
// and there is left over data (_writePos > 0), this function must be called.
Expand All @@ -529,7 +554,7 @@ private void FlushWrite(bool calledFromFinalizer)

if (_isAsync)
{
Task writeTask = WriteInternalCoreAsync(_buffer, 0, _writePos, CancellationToken.None);
Task writeTask = FlushWriteAsync(CancellationToken.None);
// With our Whidbey async IO & overlapped support for AD unloads,
// we don't strictly need to block here to release resources
// since that support takes care of the pinning & freeing the
Expand Down Expand Up @@ -1259,52 +1284,107 @@ private Task WriteInternalAsync(byte[] array, int offset, int numBytes, Cancella
if (!_parent.CanWrite) throw __Error.GetWriteNotSupported();

Debug.Assert((_readPos == 0 && _readLen == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
Debug.Assert(!_isPipe || (_readPos == 0 && _readLen == 0), "Win32FileStream must not have buffered data here! Pipes should be unidirectional.");

if (_isPipe)
bool writeDataStoredInBuffer = false;
if (!_isPipe) // avoid async buffering with pipes, as doing so can lead to deadlocks (see comments in ReadInternalAsyncCore)
{
// Pipes are tricky, at least when you have 2 different pipes
// that you want to use simultaneously. When redirecting stdout
// & stderr with the Process class, it's easy to deadlock your
// parent & child processes when doing writes 4K at a time. The
// OS appears to use a 4K buffer internally. If you write to a
// pipe that is full, you will block until someone read from
// that pipe. If you try reading from an empty pipe and
// Win32FileStream's ReadAsync blocks waiting for data to fill it's
// internal buffer, you will be blocked. In a case where a child
// process writes to stdout & stderr while a parent process tries
// reading from both, you can easily get into a deadlock here.
// To avoid this deadlock, don't buffer when doing async IO on
// pipes.
Debug.Assert(_readPos == 0 && _readLen == 0, "Win32FileStream must not have buffered data here! Pipes should be unidirectional.");
// Ensure the buffer is clear for writing
if (_writePos == 0)
{
if (_readPos < _readLen)
{
FlushRead();
}
_readPos = 0;
_readLen = 0;
}

if (_writePos > 0)
FlushWrite(false);
// Determine how much space remains in the buffer
int remainingBuffer = _bufferSize - _writePos;
Debug.Assert(remainingBuffer >= 0);

return WriteInternalCoreAsync(array, offset, numBytes, cancellationToken);
}
// Simple/common case:
// - The write is smaller than our buffer, such that it's worth considering buffering it.
// - There's no active flush operation, such that we don't have to worry about the existing buffer being in use.
// - And the data we're trying to write fits in the buffer, meaning it wasn't already filled by previous writes.
// In that case, just store it in the buffer.
if (numBytes < _bufferSize && !HasActiveBufferOperation && numBytes <= remainingBuffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if numBytes == _buffersize it skips the buffer? Makes sense; not much point in buffering it if you have to flush it on the next write regardless.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. If you're only ever writing out >= to the buffer size, we can avoid allocating the buffer at all. And even if the buffer needs to be allocated, by skipping the buffer for the == case, we can kick off the async operation now rather than deferring it until later, which is good since we know we want to do this and we may as well overlap it as much as possible with whatever we're going to do next. The synchronous Write already skips the buffer for ==; this brings WriteAsync in line with it.

{
if (_buffer == null)
_buffer = new byte[_bufferSize];

// Handle buffering.
if (_writePos == 0)
{
if (_readPos < _readLen) FlushRead();
_readPos = 0;
_readLen = 0;
Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes);
_writePos += numBytes;
writeDataStoredInBuffer = true;

// There is one special-but-common case, common because devs often use
// byte[] sizes that are powers of 2 and thus fit nicely into our buffer, which is
// also a power of 2. If after our write the buffer still has remaining space,
// then we're done and can return a completed task now. But if we filled the buffer
// completely, we want to do the asynchronous flush/write as part of this operation
// rather than waiting until the next write that fills the buffer.
if (numBytes != remainingBuffer)
return Task.CompletedTask;

Debug.Assert(_writePos == _bufferSize);
}
}

int n = _bufferSize - _writePos;
if (numBytes <= n)
// At this point, at least one of the following is true:
// 1. There was an active flush operation (it could have completed by now, though).
// 2. The data doesn't fit in the remaining buffer (or it's a pipe and we chose not to try).
// 3. We wrote all of the data to the buffer, filling it.
//
// If there's an active operation, we can't touch the current buffer because it's in use.
// That gives us a choice: we can either allocate a new buffer, or we can skip the buffer
// entirely (even if the data would otherwise fit in it). For now, for simplicity, we do
// the latter; it could also have performance wins due to OS-level optimizations, and we could
// potentially add support for PreAllocatedOverlapped due to having a single buffer. (We can
// switch to allocating a new buffer, potentially experimenting with buffer pooling, should
// performance data suggest it's appropriate.)
//
// If the data doesn't fit in the remaining buffer, it could be because it's so large
// it's greater than the entire buffer size, in which case we'd always skip the buffer,
// or it could be because there's more data than just the space remaining. For the latter
// case, we need to issue an asynchronous write to flush that data, which then turns this into
// the first case above with an active operation.
//
// If we already stored the data, then we have nothing additional to write beyond what
// we need to flush.
//
// In any of these cases, we have the same outcome:
// - If there's data in the buffer, flush it by writing it out asynchronously.
// - Then, if there's any data to be written, issue a write for it concurrently.
// We return a Task that represents one or both.

// Flush the buffer asynchronously if there's anything to flush
Task flushTask = null;
if (_writePos > 0)
{
if (_writePos == 0) _buffer = new byte[_bufferSize];
Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes);
_writePos += numBytes;

return Task.CompletedTask;
flushTask = FlushWriteAsync(cancellationToken);

// If we already copied all of the data into the buffer,
// simply return the flush task here. Same goes for if the task has
// already completed and was unsuccessful.
if (writeDataStoredInBuffer ||
flushTask.IsFaulted ||
flushTask.IsCanceled)
{
return flushTask;
}
}

if (_writePos > 0)
FlushWrite(false);
Debug.Assert(!writeDataStoredInBuffer);
Debug.Assert(_writePos == 0);

return WriteInternalCoreAsync(array, offset, numBytes, cancellationToken);
// Finally, issue the write asynchronously, and return a Task that logically
// represents the write operation, including any flushing done.
Task writeTask = WriteInternalCoreAsync(array, offset, numBytes, cancellationToken);
return
(flushTask == null || flushTask.Status == TaskStatus.RanToCompletion) ? writeTask :
(writeTask.Status == TaskStatus.RanToCompletion) ? flushTask :
Task.WhenAll(flushTask, writeTask);
}

[System.Security.SecuritySafeCritical] // auto-generated
Expand Down Expand Up @@ -1395,7 +1475,7 @@ private unsafe Task WriteInternalCoreAsync(byte[] bytes, int offset, int numByte
throw Win32Marshal.GetExceptionForWin32Error(errorCode);
}
}
else
else // ERROR_IO_PENDING
{
// Only once the IO is pending do we register for cancellation
completionSource.RegisterForCancellation();
Expand Down
39 changes: 0 additions & 39 deletions src/System.IO.FileSystem/tests/FileStream/Flush.Sharing.cs

This file was deleted.

Loading