From 01110fa798a1f726b806fd445d8d09e4bba26a07 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sun, 20 Mar 2022 23:09:17 -0700 Subject: [PATCH 1/4] Double the size of each segment - This change comes after long observations around how pipelines doesn't work well for copying large blocks of data mainly due to the segment allocation strategy. Today we allocate each segment > minimum segment size < max pool size. This works well if data is being quickly consumed because we can keep memory allocations to a minimum but doesn't work well when there's large chunks of data are being written. This change doubles the segment size based on the previous segment up to 1MB (arbitrary limit). This should make the default behavior work pretty much the same as today but performance of larger writes/reads should improve. --- .../src/System/IO/Pipelines/BufferSegment.cs | 5 +++++ .../src/System/IO/Pipelines/Pipe.cs | 20 +++++++++++-------- .../src/System/IO/Pipelines/PipeOptions.cs | 7 +++++-- .../System/IO/Pipelines/StreamPipeWriter.cs | 17 ++++++++-------- 4 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs index 65379ef68f9a30..e3963ab9e80034 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs @@ -14,6 +14,11 @@ internal sealed class BufferSegment : ReadOnlySequenceSegment private BufferSegment? _next; private int _end; + /// + /// The amount of avaiable memory in the segment. + /// + public int Capacity => AvailableMemory.Length; + /// /// The End represents the offset into AvailableMemory where the range of "active" bytes ends. At the point when the block is leased /// the End is guaranteed to be equal to Start. The value of Start may be assigned anywhere between 0 and diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 0c743f4c74a678..51f35f8ee75a43 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -178,7 +178,7 @@ private void AllocateWriteHeadSynchronized(int sizeHint) if (_writingHead == null) { // We need to allocate memory to write since nobody has written before - BufferSegment newSegment = AllocateSegment(sizeHint); + BufferSegment newSegment = AllocateSegment(MinimumSegmentSize, sizeHint); // Set all the pointers _writingHead = _readHead = _readTail = newSegment; @@ -197,7 +197,9 @@ private void AllocateWriteHeadSynchronized(int sizeHint) _writingHeadBytesBuffered = 0; } - BufferSegment newSegment = AllocateSegment(sizeHint); + // Double the minimum segment size on subsequent segements + int newSegmentSize = Math.Min(PipeOptions.MaximumSegmentSize, _writingHead.Capacity * 2); + BufferSegment newSegment = AllocateSegment(newSegmentSize, sizeHint); _writingHead.SetNext(newSegment); _writingHead = newSegment; @@ -206,7 +208,7 @@ private void AllocateWriteHeadSynchronized(int sizeHint) } } - private BufferSegment AllocateSegment(int sizeHint) + private BufferSegment AllocateSegment(int minimumSegmentSize, int sizeHint) { Debug.Assert(sizeHint >= 0); BufferSegment newSegment = CreateSegmentUnsynchronized(); @@ -223,12 +225,12 @@ private BufferSegment AllocateSegment(int sizeHint) if (sizeHint <= maxSize) { // Use the specified pool as it fits. Specified pool is not null as maxSize == -1 if _pool is null. - newSegment.SetOwnedMemory(pool!.Rent(GetSegmentSize(sizeHint, maxSize))); + newSegment.SetOwnedMemory(pool!.Rent(GetSegmentSize(minimumSegmentSize, sizeHint, maxSize))); } else { // Use the array pool - int sizeToRequest = GetSegmentSize(sizeHint); + int sizeToRequest = GetSegmentSize(minimumSegmentSize, sizeHint); newSegment.SetOwnedMemory(ArrayPool.Shared.Rent(sizeToRequest)); } @@ -237,10 +239,11 @@ private BufferSegment AllocateSegment(int sizeHint) return newSegment; } - private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) + private int GetSegmentSize(int minimumSegmentSize, int sizeHint, int maxBufferSize = int.MaxValue) { // First we need to handle case where hint is smaller than minimum segment size - sizeHint = Math.Max(MinimumSegmentSize, sizeHint); + sizeHint = Math.Max(minimumSegmentSize, sizeHint); + // After that adjust it to fit into pools max buffer size int adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint); return adjustedToMaximumSize; @@ -1090,7 +1093,8 @@ private void WriteMultiSegment(ReadOnlySpan source) // This is optimized to use pooled memory. That's why we pass 0 instead of // source.Length - BufferSegment newSegment = AllocateSegment(0); + int newSegmentSize = Math.Min(PipeOptions.MaximumSegmentSize, _writingHead.Capacity * 2); + BufferSegment newSegment = AllocateSegment(newSegmentSize, 0); _writingHead.SetNext(newSegment); _writingHead = newSegment; diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs index 998845f804dd33..b0bc6ce7ee6cee 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs @@ -11,6 +11,9 @@ public class PipeOptions { private const int DefaultMinimumSegmentSize = 4096; + // Arbitrary 1MB max segment size + internal const int MaximumSegmentSize = 1024 * 1024; + /// Gets the default instance of . /// A object initialized with default parameters. public static PipeOptions Default { get; } = new PipeOptions(); @@ -38,10 +41,10 @@ public PipeOptions( // to let users specify the maximum buffer size, so we pick a reasonable number based on defaults. They can influence // how much gets buffered by increasing the minimum segment size. - // With a defaukt segment size of 4K this maps to 16K + // With a default minimum segment size of 4 this maps to 60K InitialSegmentPoolSize = 4; - // With a defaukt segment size of 4K this maps to 1MB. If the pipe has large segments this will be bigger than 1MB... + // With a default minimum segment size of 4K this maps to 1MB. If the pipe has large segments this will be bigger than 1MB... MaxSegmentPoolSize = 256; // By default, we'll throttle the writer at 64K of buffered data diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs index bfd90adeefbc6a..5e5c35616394ab 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs @@ -115,7 +115,7 @@ private void AllocateMemory(int sizeHint) if (_head == null) { // We need to allocate memory to write since nobody has written before - BufferSegment newSegment = AllocateSegment(sizeHint); + BufferSegment newSegment = AllocateSegment(_minimumBufferSize, sizeHint); // Set all the pointers _head = _tail = newSegment; @@ -135,7 +135,8 @@ private void AllocateMemory(int sizeHint) _tailBytesBuffered = 0; } - BufferSegment newSegment = AllocateSegment(sizeHint); + int newSegmentSize = Math.Min(PipeOptions.MaximumSegmentSize, _tail.Capacity * 2); + BufferSegment newSegment = AllocateSegment(newSegmentSize, sizeHint); _tail.SetNext(newSegment); _tail = newSegment; @@ -143,7 +144,7 @@ private void AllocateMemory(int sizeHint) } } - private BufferSegment AllocateSegment(int sizeHint) + private BufferSegment AllocateSegment(int minimumBufferSize, int sizeHint) { Debug.Assert(sizeHint >= 0); BufferSegment newSegment = CreateSegmentUnsynchronized(); @@ -152,12 +153,12 @@ private BufferSegment AllocateSegment(int sizeHint) if (sizeHint <= maxSize) { // Use the specified pool as it fits. Specified pool is not null as maxSize == -1 if _pool is null. - newSegment.SetOwnedMemory(_pool!.Rent(GetSegmentSize(sizeHint, maxSize))); + newSegment.SetOwnedMemory(_pool!.Rent(GetSegmentSize(minimumBufferSize, sizeHint, maxSize))); } else { // Use the array pool - int sizeToRequest = GetSegmentSize(sizeHint); + int sizeToRequest = GetSegmentSize(minimumBufferSize, sizeHint); newSegment.SetOwnedMemory(ArrayPool.Shared.Rent(sizeToRequest)); } @@ -166,12 +167,12 @@ private BufferSegment AllocateSegment(int sizeHint) return newSegment; } - private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) + private int GetSegmentSize(int minimumBufferSize, int sizeHint, int maxBufferSize = int.MaxValue) { // First we need to handle case where hint is smaller than minimum segment size - sizeHint = Math.Max(_minimumBufferSize, sizeHint); + sizeHint = Math.Max(minimumBufferSize, sizeHint); // After that adjust it to fit into pools max buffer size - var adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint); + int adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint); return adjustedToMaximumSize; } From 21ee822079de6acf5df3e455e8677b162a625163 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sun, 20 Mar 2022 23:15:51 -0700 Subject: [PATCH 2/4] Fixed tests and comment --- .../src/System/IO/Pipelines/PipeOptions.cs | 2 +- src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs index b0bc6ce7ee6cee..6ae4de91bf4788 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs @@ -44,7 +44,7 @@ public PipeOptions( // With a default minimum segment size of 4 this maps to 60K InitialSegmentPoolSize = 4; - // With a default minimum segment size of 4K this maps to 1MB. If the pipe has large segments this will be bigger than 1MB... + // With a default minimum segment size of 4K this maps to ~250MB. MaxSegmentPoolSize = 256; // By default, we'll throttle the writer at 64K of buffered data diff --git a/src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs b/src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs index afab95b464b9d4..73b824bd5b6efc 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs @@ -101,7 +101,7 @@ public async Task MultipleCompleteReaderWriterCauseDisposeOnlyOnce() } [Fact] - public async Task RentsMinimumSegmentSize() + public async Task DoublesMinimumSegmentSize() { var pool = new DisposeTrackingBufferPool(); var writeSize = 512; @@ -118,7 +118,7 @@ public async Task RentsMinimumSegmentSize() pipe.Reader.Complete(); pipe.Writer.Complete(); - Assert.Equal(2020, ensuredSize); + Assert.Equal(4040, ensuredSize); Assert.Equal(2020, allocatedSize); } From d4f728ce2b36530ef9e646ab841805b0db85042c Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sun, 17 Apr 2022 22:02:22 -0700 Subject: [PATCH 3/4] Make GetSegmentSize static --- .../System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 51f35f8ee75a43..27f2745f1d2863 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -239,7 +239,7 @@ private BufferSegment AllocateSegment(int minimumSegmentSize, int sizeHint) return newSegment; } - private int GetSegmentSize(int minimumSegmentSize, int sizeHint, int maxBufferSize = int.MaxValue) + private static int GetSegmentSize(int minimumSegmentSize, int sizeHint, int maxBufferSize = int.MaxValue) { // First we need to handle case where hint is smaller than minimum segment size sizeHint = Math.Max(minimumSegmentSize, sizeHint); From 93856017263686b4a1dd6563e255d2db5473c77f Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sun, 17 Apr 2022 22:58:46 -0700 Subject: [PATCH 4/4] Again --- .../src/System/IO/Pipelines/StreamPipeWriter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs index 5e5c35616394ab..028801981b7e54 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs @@ -167,7 +167,7 @@ private BufferSegment AllocateSegment(int minimumBufferSize, int sizeHint) return newSegment; } - private int GetSegmentSize(int minimumBufferSize, int sizeHint, int maxBufferSize = int.MaxValue) + private static int GetSegmentSize(int minimumBufferSize, int sizeHint, int maxBufferSize = int.MaxValue) { // First we need to handle case where hint is smaller than minimum segment size sizeHint = Math.Max(minimumBufferSize, sizeHint);