Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/System.IO.Pipelines/System.IO.Pipelines.sln
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ Global
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Debug|Any CPU.Build.0 = netcoreapp-Debug|Any CPU
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Release|Any CPU.ActiveCfg = netcoreapp-Release|Any CPU
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Release|Any CPU.Build.0 = netcoreapp-Release|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.ActiveCfg = netstandard-Debug|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.Build.0 = netstandard-Debug|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.ActiveCfg = netstandard-Release|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.Build.0 = netstandard-Release|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.ActiveCfg = netcoreapp-Debug|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.Build.0 = netcoreapp-Debug|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.ActiveCfg = netcoreapp-Release|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.Build.0 = netcoreapp-Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
2 changes: 1 addition & 1 deletion src/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\ref\System.IO.Pipelines.csproj">
<SupportedFramework>net461;netcoreapp2.0;uap10.0.16299;$(AllXamarinFrameworks)</SupportedFramework>
<SupportedFramework>netcoreapp3.0;$(UAPvNextTFM);$(AllXamarinFrameworks)</SupportedFramework>
</ProjectReference>
<ProjectReference Include="..\src\System.IO.Pipelines.csproj" />
<HarvestIncludePaths Include="lib/netstandard1.3" />
Expand Down
4 changes: 4 additions & 0 deletions src/System.IO.Pipelines/ref/Configurations.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
<Project>
<PropertyGroup>
<PackageConfigurations>
netcoreapp;
</PackageConfigurations>
<BuildConfigurations>
$(PackageConfigurations);
netstandard;
</BuildConfigurations>
</PropertyGroup>
Expand Down
13 changes: 8 additions & 5 deletions src/System.IO.Pipelines/ref/System.IO.Pipelines.csproj
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ProjectGuid>{9C524CA0-92FF-437B-B568-BCE8A794A69A}</ProjectGuid>
<Configurations>netstandard-Debug;netstandard-Release</Configurations>
<!-- We only plan to use this ref in netcoreapp. For all other netstandard compatible frameworks we should use the lib
asset instead. -->
<PackageTargetFramework Condition="'$(TargetGroup)' == 'netstandard'">netcoreapp2.0</PackageTargetFramework>
<Configurations>netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release</Configurations>
</PropertyGroup>
<ItemGroup>
<Compile Include="System.IO.Pipelines.cs" />
Expand All @@ -16,4 +13,10 @@
<ItemGroup>
<ProjectReference Include="..\..\System.Buffers\ref\System.Buffers.csproj" />
</ItemGroup>
</Project>
<ItemGroup Condition="'$(TargetGroup)'=='netstandard'">
<ProjectReference Include="..\..\Microsoft.Bcl.AsyncInterfaces\ref\Microsoft.Bcl.AsyncInterfaces.csproj" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)' == 'netcoreapp'">
<ProjectReference Include="..\..\System.Runtime\ref\System.Runtime.csproj" />
</ItemGroup>
</Project>
4 changes: 4 additions & 0 deletions src/System.IO.Pipelines/src/System.IO.Pipelines.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<PropertyGroup>
<ProjectGuid>{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}</ProjectGuid>
<Configurations>netcoreapp-Debug;netcoreapp-Debug;netcoreapp-Release;netcoreapp-Release;netstandard-Debug;netstandard-Release</Configurations>
<DefineConstants Condition="'$(TargetsNetFx)' == 'true'">$(DefineConstants);netstandard</DefineConstants>
</PropertyGroup>
<ItemGroup>
<Compile Include="$(CommonPath)\CoreLib\System\Threading\Tasks\TaskToApm.cs">
Expand Down Expand Up @@ -58,4 +59,7 @@
<Reference Include="System.Threading.Tasks" />
<Reference Include="System.Threading.ThreadPool" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)' != 'netcoreapp'">
<Reference Include="Microsoft.Bcl.AsyncInterfaces" />
</ItemGroup>
</Project>
208 changes: 180 additions & 28 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace System.IO.Pipelines
{
internal class StreamPipeReader : PipeReader
internal class StreamPipeReader : PipeReader, IValueTaskSource<ReadResult>
{
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB
Expand All @@ -28,11 +30,19 @@ internal class StreamPipeReader : PipeReader
private BufferSegment _readTail;
private long _bufferedBytes;
private bool _examinedEverything;
private object _lock = new object();
private readonly object _lock = new object();

// Mutable struct! Don't make this readonly
private BufferSegmentStack _bufferSegmentPool;
private bool _leaveOpen;
private readonly bool _leaveOpen;

// State for async reads
private volatile bool _readInProgress;
private readonly Action _onReadCompleted;
private ManualResetValueTaskSourceCore<ReadResult> _readMrvts;
private ValueTaskAwaiter<int> _readAwaiter;
private CancellationToken _readCancellation;
private CancellationTokenRegistration _readRegistration;

/// <summary>
/// Creates a new StreamPipeReader.
Expand All @@ -53,6 +63,7 @@ public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options)
_pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
_bufferSize = _pool == null ? options.BufferSize : Math.Min(options.BufferSize, _pool.MaxBufferSize);
_leaveOpen = options.LeaveOpen;
_onReadCompleted = new Action(OnReadCompleted);
}

/// <summary>
Expand All @@ -72,11 +83,7 @@ private CancellationTokenSource InternalTokenSource
{
lock (_lock)
{
if (_internalTokenSource == null)
{
_internalTokenSource = new CancellationTokenSource();
}
return _internalTokenSource;
return (_internalTokenSource ??= new CancellationTokenSource());
}
}
}
Expand Down Expand Up @@ -188,39 +195,59 @@ public override void Complete(Exception exception = null)
}

/// <inheritdoc />
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
// TODO ReadyAsync needs to throw if there are overlapping reads.
ThrowIfCompleted();

// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out ReadResult readResult))
if (_readInProgress)
{
return readResult;
// Throw if there are overlapping reads; throwing unwrapped as it suggests last read was not awaited
// so we surface it directly rather than wrapped in a Task (as this one will likely also not be awaited).
ThrowConcurrentReadsNotSupported();
}
_readInProgress = true;

if (_isStreamCompleted)
bool isAsync = false;
try
{
return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
}

var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
}
ThrowIfCompleted();

// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out ReadResult readResult))
{
return new ValueTask<ReadResult>(readResult);
}

if (_isStreamCompleted)
{
return new ValueTask<ReadResult>(new ReadResult(buffer: default, isCanceled: false, isCompleted: true));
}

var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
}

using (reg)
{
var isCanceled = false;
try
{
AllocateReadTail();

Memory<byte> buffer = _readTail.AvailableMemory.Slice(_readTail.End);

int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
ValueTask<int> resultTask = InnerStream.ReadAsync(buffer, tokenSource.Token);
int length;
if (resultTask.IsCompletedSuccessfully)
{
length = resultTask.Result;
}
else
{
isAsync = true;
// Need to go async
return CompleteReadAsync(resultTask, cancellationToken, reg);
}

Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);

Expand All @@ -247,8 +274,27 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
}

}
finally
{
if (!isAsync)
{
reg.Dispose();
}
}

return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted);
return new ValueTask<ReadResult>(new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted));
}
catch (Exception ex)
{
return new ValueTask<ReadResult>(Task.FromException<ReadResult>(ex));
}
finally
{
if (!isAsync)
{
Debug.Assert(_readInProgress);
_readInProgress = false;
}
}
}

Expand All @@ -270,6 +316,11 @@ private void ThrowIfCompleted()

public override bool TryRead(out ReadResult result)
{
if (_readInProgress)
{
ThrowConcurrentReadsNotSupported();
}

ThrowIfCompleted();

return TryReadInternal(InternalTokenSource, out result);
Expand Down Expand Up @@ -357,5 +408,106 @@ private void Cancel()
{
InternalTokenSource.Cancel();
}

static void ThrowConcurrentReadsNotSupported()
{
throw new InvalidOperationException($"Concurrent reads are not supported; await the {nameof(ValueTask<ReadResult>)} before starting next read.");
}

private ValueTask<ReadResult> CompleteReadAsync(ValueTask<int> task, CancellationToken cancellationToken, CancellationTokenRegistration reg)
{
Debug.Assert(_readInProgress, "Read not in progress");

_readCancellation = cancellationToken;
_readRegistration = reg;

_readAwaiter = task.GetAwaiter();

return new ValueTask<ReadResult>(this, _readMrvts.Version);
}

private void OnReadCompleted()
{
try
{
int length = _readAwaiter.GetResult();

Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);

_readTail.End += length;
_bufferedBytes += length;

if (length == 0)
{
_isStreamCompleted = true;
}

_readMrvts.SetResult(new ReadResult(GetCurrentReadOnlySequence(), isCanceled: false, _isStreamCompleted));
}
catch (OperationCanceledException oce)
{
// Get the source before clearing (and replacing)
CancellationTokenSource tokenSource = InternalTokenSource;
ClearCancellationToken();
if (tokenSource.IsCancellationRequested && !_readCancellation.IsCancellationRequested)
{
// Catch cancellation and translate it into setting isCanceled = true
_readMrvts.SetResult(new ReadResult(GetCurrentReadOnlySequence(), isCanceled: true, _isStreamCompleted));
}
else
{
_readMrvts.SetException(oce);
}
}
catch (Exception ex)
{
_readMrvts.SetException(ex);
}
finally
{
_readRegistration.Dispose();
_readRegistration = default;
}
}

ReadResult IValueTaskSource<ReadResult>.GetResult(short token)
{
ValidateReading();
ReadResult result = _readMrvts.GetResult(token);

_readCancellation = default;
_readAwaiter = default;
_readMrvts.Reset();

Debug.Assert(_readInProgress);
_readInProgress = false;

return result;
}

ValueTaskSourceStatus IValueTaskSource<ReadResult>.GetStatus(short token)
=> _readMrvts.GetStatus(token);

void IValueTaskSource<ReadResult>.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
ValidateReading();
_readMrvts.OnCompleted(continuation, state, token, flags);

_readAwaiter.UnsafeOnCompleted(_onReadCompleted);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ValidateReading()
{
if (!_readInProgress)
{
ThrowReadNotInProgress();
}

static void ThrowReadNotInProgress()
{
throw new InvalidOperationException("Read not in progress");
}
}
}
}
21 changes: 21 additions & 0 deletions src/System.IO.Pipelines/tests/StreamPipeReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ async Task DoAsyncWrites(PipeWriter writer, int[] bufferSizes)
pipe.Reader.Complete();
}

[Fact]
public void ConcurrentReadsThrow()
{
var pipe = new Pipe();
var options = new StreamPipeReaderOptions(bufferSize: 4096);
PipeReader reader = PipeReader.Create(pipe.Reader.AsStream(), options);

ValueTask<ReadResult> valueTask = reader.ReadAsync();

Assert.False(valueTask.IsCompleted);

Assert.Throws<InvalidOperationException>(() => reader.ReadAsync());
Assert.Throws<InvalidOperationException>(() => reader.TryRead(out _));

Assert.False(valueTask.IsCompleted);

reader.Complete();

pipe.Reader.Complete();
}

[Theory]
[MemberData(nameof(ReadSettings))]
public async Task ReadWithDifferentSettings(int bytesInBuffer, int bufferSize, int minimumReadSize, int[] readBufferSizes)
Expand Down