-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Handle internal overflow of the write buffer in WriteAsync by introdu… #2802
Changes from all commits
cb1db85
17c5fc0
5e05efc
84a9ce4
cfeefdb
21da6c0
521196b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ internal sealed partial class Win32FileStream : FileStreamBase | |
| private long _appendStart;// When appending, prevent overwriting file. | ||
|
|
||
| private Task<int> _lastSynchronouslyCompletedTask = null; | ||
| private Task _activeBufferOperation = null; | ||
|
|
||
| [System.Security.SecuritySafeCritical] | ||
| public Win32FileStream(String path, FileMode mode, FileAccess access, FileShare share, int bufferSize, FileOptions options, FileStream parent) : base(parent) | ||
|
|
@@ -331,6 +332,10 @@ private unsafe void VerifyHandleIsSync() | |
| } | ||
| } | ||
|
|
||
| private bool HasActiveBufferOperation | ||
| { | ||
| get { return _activeBufferOperation != null && !_activeBufferOperation.IsCompleted; } | ||
| } | ||
|
|
||
| public override bool CanRead | ||
| { | ||
|
|
@@ -520,6 +525,26 @@ private void FlushRead() | |
| _readLen = 0; | ||
| } | ||
|
|
||
| // Returns a task that flushes the internal write buffer | ||
| private Task FlushWriteAsync(CancellationToken cancellationToken) | ||
| { | ||
| Debug.Assert(_isAsync); | ||
| Debug.Assert(_readPos == 0 && _readLen == 0, "FileStream: Read buffer must be empty in FlushWriteAsync!"); | ||
|
|
||
| // If the buffer is already flushed, don't spin up the OS write | ||
| if (_writePos == 0) return Task.CompletedTask; | ||
|
|
||
| Task flushTask = WriteInternalCoreAsync(_buffer, 0, _writePos, cancellationToken); | ||
| _writePos = 0; | ||
|
|
||
| // Update the active buffer operation | ||
| _activeBufferOperation = HasActiveBufferOperation ? | ||
| Task.WhenAll(_activeBufferOperation, flushTask) : | ||
| flushTask; | ||
|
|
||
| return flushTask; | ||
| } | ||
|
|
||
| // Writes are buffered. Anytime the buffer fills up | ||
| // (_writePos + delta > _bufferSize) or the buffer switches to reading | ||
| // and there is left over data (_writePos > 0), this function must be called. | ||
|
|
@@ -529,7 +554,7 @@ private void FlushWrite(bool calledFromFinalizer) | |
|
|
||
| if (_isAsync) | ||
| { | ||
| Task writeTask = WriteInternalCoreAsync(_buffer, 0, _writePos, CancellationToken.None); | ||
| Task writeTask = FlushWriteAsync(CancellationToken.None); | ||
| // With our Whidbey async IO & overlapped support for AD unloads, | ||
| // we don't strictly need to block here to release resources | ||
| // since that support takes care of the pinning & freeing the | ||
|
|
@@ -1277,33 +1302,83 @@ private Task WriteInternalAsync(byte[] array, int offset, int numBytes, Cancella | |
| // pipes. | ||
| Debug.Assert(_readPos == 0 && _readLen == 0, "Win32FileStream must not have buffered data here! Pipes should be unidirectional."); | ||
|
|
||
| // If there's data in the write buffer, flush it before starting a new write | ||
| if (_writePos > 0) | ||
| FlushWrite(false); | ||
| FlushWriteAsync(cancellationToken); | ||
|
|
||
| return WriteInternalCoreAsync(array, offset, numBytes, cancellationToken); | ||
| } | ||
|
|
||
| // Handle buffering. | ||
| // Ensure the buffer is clear for writing | ||
| if (_writePos == 0) | ||
| { | ||
| if (_readPos < _readLen) FlushRead(); | ||
| _readPos = 0; | ||
| _readLen = 0; | ||
| } | ||
|
|
||
| int n = _bufferSize - _writePos; | ||
| if (numBytes <= n) | ||
| // There are a few different cases to handle when performing the write operation | ||
| // surrounding the internal buffer. Buffer flush operations can be issued asynchronously | ||
| // so the state of any async operation that touches the internal buffer must be tracked | ||
| // to ensure there are no data races at play. | ||
| // | ||
| // 1. No active async buffer op and numBytes fits in existing buffer: | ||
| // Simply perform a memory copy operation and return synchronously. | ||
| // | ||
| // 2. Active async buffer op and numBytes <= _bufferSize | ||
| // Since there's an active buffer operation, a new buffer needs to be allocated | ||
| // to write to in order to ensure the existing buffer isn't modified mid-write. | ||
| // The incoming write can then be copied into the new buffer. | ||
| // | ||
| // 3. No active async buffer and numBytes too large for buffer | ||
| // If there's buffered data, issue a flush operation. Then, regardless, issue the | ||
| // incoming write since it can't be buffered. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way this is phrased makes me as a reader think that the write is only issued if there's buffered data. I'd suggest rephrasing it just slightly: |
||
| // | ||
| // 4. Active async buffer op and numBytes too large for buffer | ||
| // If there's buffered data, attach it to the existing async operation using Task.WhenAll | ||
| // and then directly issue the incoming write. | ||
| // | ||
| // Note: since _writePos is reset synchronously when calling FlushWriteAsync, it can be | ||
| // assumed that if _writePos > 0 and there's an active async flush operation, the buffer | ||
| // was reallocated since that operation was initiated. | ||
|
|
||
| // Case 1 - no active buffer op & fits in remaining size | ||
| int remainingBuffer = _bufferSize - _writePos; | ||
| if (!HasActiveBufferOperation && numBytes < remainingBuffer) | ||
| { | ||
| if (_writePos == 0) _buffer = new byte[_bufferSize]; | ||
| if (_buffer == null) _buffer = new byte[_bufferSize]; | ||
|
|
||
| Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes); | ||
| _writePos += numBytes; | ||
|
|
||
| return Task.CompletedTask; | ||
| } | ||
|
|
||
| // Case 2 - active buffer op & fits in new buffer | ||
| if (HasActiveBufferOperation && numBytes < _bufferSize) | ||
| { | ||
| _buffer = new byte[_bufferSize]; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So does this mean that every time we hit this path of going past the end of the current buffer, we're going to allocate a new one? That seems quite costly. I wonder if some small buffer pool is warranted, e.g. we allow only up to N buffer allocations, which are pooled, and if we try to get a buffer from the pool and we've already allocated N, then we fall back to synchronous flushing. Or maybe in the case of overflowing the buffer, we support growing the buffer used to be the size of what's already cached plus the size of whatever the developer supplied (we could still do flushes when we hit the original _bufferSize, but we'd have enough room to store all of the user supplied data). In degenerate situations, the buffer could get extremely large, but maybe for such cases we set of a threshold and do synchronous flushes beyond that. @ericstj, you've probably thought about this before...? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think I may be making this more complicated than it needs to be (Friday afternoon brain). What you have is probably fine, except that I'm not sure it's worth the extra buffer allocation for this case. What if we just said:
Essentially I think that's exactly what you have, except just deleting this additional case you added. Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's totally fine with me, I added that case to try and avoid a second write operation, and figured the allocation is fine since it's already reallocated in the synchronous case above when the buffer is empty. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's true. Let me think about this a bit over the weekend. This would all get much simpler if we said that all async operations are serialized and you can't do synchronous operations while an async operation is in flight. Then we shouldn't need any additional buffer allocations. |
||
|
|
||
| Buffer.BlockCopy(array, offset, _buffer, 0, numBytes); | ||
| _writePos = numBytes; | ||
| return Task.CompletedTask; | ||
| } | ||
|
|
||
| // Case 3 - Incoming write can't be buffered and there's existing data in the buffer | ||
| if (_writePos > 0) | ||
| FlushWrite(false); | ||
| { | ||
| Task flushTask = FlushWriteAsync(cancellationToken); | ||
|
|
||
| // If the task has already completed and was unsuccessful, avoid issuing the write. | ||
| if (flushTask.IsCanceled || flushTask.IsFaulted) return flushTask; | ||
|
|
||
| // Return both the flush and write operations. It's not necessary that they complete in a | ||
| // specific order since the internal position is updated synchronously when crafting the | ||
| // tasks, but consumers will want to know if either fails so the health of the stream is known. | ||
| return Task.WhenAll(flushTask, WriteInternalCoreAsync(array, offset, numBytes, cancellationToken)); | ||
| } | ||
|
|
||
| // Case 4 - Incoming write can't be buffered and there's no existing buffered data, so just | ||
| // issue the OS write directly. | ||
| return WriteInternalCoreAsync(array, offset, numBytes, cancellationToken); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -270,6 +270,30 @@ public async Task WriteAsyncCancelledFile() | |
| } | ||
| } | ||
|
|
||
| [Fact] | ||
| public async void WriteAsyncInternalBufferOverflow() | ||
| { | ||
| // Overflow into next buffer | ||
| using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create, FileAccess.Write, FileShare.None, 2)) | ||
| { | ||
| // Fill existing buffer | ||
| await fs.WriteAsync(TestBuffer, 0, 2); | ||
| Assert.True(fs.Length == 2); | ||
|
|
||
| // Overflow into next buffer | ||
| await fs.WriteAsync(TestBuffer, 0, 1); | ||
| Assert.True(fs.Length == 3); | ||
|
|
||
| // Overflow bufferSize * 2 | ||
| await fs.WriteAsync(TestBuffer, 0, 5); | ||
| Assert.True(fs.Length == 8); | ||
|
|
||
| // Overflow bufferSize * 2 with empty buffer | ||
| await fs.WriteAsync(TestBuffer, 0, 5); | ||
| Assert.True(fs.Length == 13); | ||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It'd be good to have a test that tries to issue many writes/flushes to be run concurrently. The test you have and this should also verify that Position reflects the correct value immediately after the synchronous call to the XxAsync method returns. Note that the OS may actually choose to issue writes synchronously, and I believe it'll do so if the write would require extending the size of the file, so it'd be good to have the test verify behavior both with a zero-length file to start and with a file presized for all data needed (this coudl be done as a theory with a Boolean argument that controls whether you start by setting the length). I believe it'll also do writes synchronously for small writes, so it'd be good to also have a test that does large writes, like 100K at a time. |
||
|
|
||
| [Fact, OuterLoop] | ||
| public async Task WriteAsyncMiniStress() | ||
| { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It occurred to me as I was reading how you wrote this out that there's another option for this case: simply skip the buffer entirely and just issue the async write, rather than allocating a new buffer and copying into it... basically making it the same as the 4th case. In other words, we use the buffer if there's room and there's no active buffer operation, otherwise we issue the write directly. Might be worth noting that, and we could experiment with the change separately to see which approach yields the best results.