Skip to content
This repository was archived by the owner on Aug 2, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion corefxlab.sln
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27130.2020
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{5E7EB061-B9BC-4DA2-B5E5-859AA7C67695}"
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "MultiSegmentBytesReaderNumbers", "MultiSegmentBytesReaderNumbers", "{5E7EB061-B9BC-4DA2-B5E5-859AA7C67695}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this change. Was it intentional?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted this change in #2067

ProjectSection(SolutionItems) = preProject
global.json = global.json
NuGet.Config = NuGet.Config
Expand Down
2 changes: 1 addition & 1 deletion samples/System.IO.Pipelines.Samples/CompressionSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public Task Run()

// Wrap the console in a pipeline writer

var outputPipe = new Pipe(options);
var outputPipe = new ResetablePipe(options);
outputPipe.Reader.CopyToEndAsync(Console.OpenStandardOutput());

// Copy from the file reader to the console writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private static async Task ProduceResponse(ConnectionState state, IPipeConnection
}
}

private static void WriteHeaders(HttpHeaders headers, IPipeWriter buffer)
private static void WriteHeaders(HttpHeaders headers, PipeWriter buffer)
{
foreach (var header in headers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ namespace System.IO.Pipelines.Samples
{
public class PipelineHttpContent : HttpContent
{
private readonly IPipeReader _output;
private readonly PipeReader _output;

public PipelineHttpContent(IPipeReader output)
public PipelineHttpContent(PipeReader output)
{
_output = output;
}
Expand Down
16 changes: 8 additions & 8 deletions samples/System.IO.Pipelines.Samples/HttpServer/HttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public partial class HttpConnection<TContext>
private static readonly byte[] _chunkedEndBytes = Encoding.UTF8.GetBytes("0\r\n\r\n");
private static readonly byte[] _endChunkBytes = Encoding.ASCII.GetBytes("\r\n");

private readonly IPipeReader _input;
private readonly IPipeWriter _output;
private readonly PipeReader _input;
private readonly PipeWriter _output;
private readonly IHttpApplication<TContext> _application;

public RequestHeaderDictionary RequestHeaders => _parser.RequestHeaders;
Expand All @@ -41,7 +41,7 @@ public partial class HttpConnection<TContext>

private HttpRequestParser _parser = new HttpRequestParser();

public HttpConnection(IHttpApplication<TContext> application, IPipeReader input, IPipeWriter output)
public HttpConnection(IHttpApplication<TContext> application, PipeReader input, PipeWriter output)
{
_application = application;
_input = input;
Expand All @@ -50,9 +50,9 @@ public HttpConnection(IHttpApplication<TContext> application, IPipeReader input,
_responseBody = new HttpResponseStream<TContext>(this);
}

public IPipeReader Input => _input;
public PipeReader Input => _input;

public IPipeWriter Output => _output;
public PipeWriter Output => _output;

public HttpRequestStream<TContext> RequestBody { get; set; }

Expand Down Expand Up @@ -194,12 +194,12 @@ public Task WriteAsync(Span<byte> data)
return FlushAsync(buffer);
}

public async Task FlushAsync(IPipeWriter buffer)
public async Task FlushAsync(PipeWriter buffer)
{
await buffer.FlushAsync();
}

private void WriteBeginResponseHeaders(IPipeWriter buffer)
private void WriteBeginResponseHeaders(PipeWriter buffer)
{
if (HasStarted)
{
Expand All @@ -217,7 +217,7 @@ private void WriteBeginResponseHeaders(IPipeWriter buffer)
ResponseHeaders.CopyTo(_autoChunk, buffer);
}

private void WriteEndResponse(IPipeWriter buffer)
private void WriteEndResponse(PipeWriter buffer)
{
buffer.Write(_chunkedEndBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public async Task Run()

protected abstract Task<IPipeConnection> GetConnection();

private async Task CopyCompletedAsync(IPipeReader input, IPipeWriter output)
private async Task CopyCompletedAsync(PipeReader input, PipeWriter output)
{
var result = await input.ReadAsync();
var inputBuffer = result.Buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected async Task ProcessConnection(IPipeConnection connection)

// Writing directly to pooled buffers
var output = connection.Output;
var formatter = new OutputFormatter<IPipeWriter>(output, SymbolTable.InvariantUtf8);
var formatter = new OutputFormatter<PipeWriter>(output, SymbolTable.InvariantUtf8);
formatter.Append("HTTP/1.1 200 OK");
formatter.Append("\r\nContent-Length: 13");
formatter.Append("\r\nContent-Type: text/plain");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,82 +10,82 @@ namespace System.IO.Pipelines.Compression
{
public static class CompressionPipelineExtensions
{
public static IPipeReader DeflateDecompress(this IPipeReader reader, PipeOptions options)
public static PipeReader DeflateDecompress(this PipeReader reader, PipeOptions options)
{
var inflater = new ReadableDeflateTransform(ZLibNative.Deflate_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = inflater.Execute(reader, pipe.Writer);
return pipe.Reader;
}

public static IPipeReader DeflateCompress(this IPipeReader reader, PipeOptions options, CompressionLevel compressionLevel)
public static PipeReader DeflateCompress(this PipeReader reader, PipeOptions options, CompressionLevel compressionLevel)
{
var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.Deflate_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = deflater.Execute(reader, pipe.Writer);
return pipe.Reader;
}

public static IPipeReader GZipDecompress(this IPipeReader reader, PipeOptions options)
public static PipeReader GZipDecompress(this PipeReader reader, PipeOptions options)
{
var inflater = new ReadableDeflateTransform(ZLibNative.GZip_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = inflater.Execute(reader, pipe.Writer);
return pipe.Reader;
}

public static IPipeWriter GZipCompress(this IPipeWriter writer, PipeOptions options, CompressionLevel compressionLevel)
public static PipeWriter GZipCompress(this PipeWriter writer, PipeOptions options, CompressionLevel compressionLevel)
{
var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.GZip_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = deflater.Execute(pipe.Reader, writer);
return pipe.Writer;
}

public static IPipeReader GZipCompress(this IPipeReader reader, PipeOptions options, CompressionLevel compressionLevel)
public static PipeReader GZipCompress(this PipeReader reader, PipeOptions options, CompressionLevel compressionLevel)
{
var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.GZip_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = deflater.Execute(reader, pipe.Writer);
return pipe.Reader;
}

public static IPipeReader CreateDeflateDecompressReader(PipeOptions options, IPipeReader reader)
public static PipeReader CreateDeflateDecompressReader(PipeOptions options, PipeReader reader)
{
var inflater = new ReadableDeflateTransform(ZLibNative.Deflate_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = inflater.Execute(reader, pipe.Writer);
return pipe.Reader;
}

public static IPipeReader CreateDeflateCompressReader(PipeOptions options, IPipeReader reader, CompressionLevel compressionLevel)
public static PipeReader CreateDeflateCompressReader(PipeOptions options, PipeReader reader, CompressionLevel compressionLevel)
{
var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.Deflate_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = deflater.Execute(reader, pipe.Writer);
return pipe.Reader;
}

public static IPipeReader CreateGZipDecompressReader(PipeOptions options, IPipeReader reader)
public static PipeReader CreateGZipDecompressReader(PipeOptions options, PipeReader reader)
{
var inflater = new ReadableDeflateTransform(ZLibNative.GZip_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = inflater.Execute(reader, pipe.Writer);
return pipe.Reader;
}

public static IPipeWriter CreateGZipCompressWriter(PipeOptions options, IPipeWriter writer, CompressionLevel compressionLevel)
public static PipeWriter CreateGZipCompressWriter(PipeOptions options, PipeWriter writer, CompressionLevel compressionLevel)
{
var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.GZip_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = deflater.Execute(pipe.Reader, writer);
return pipe.Writer;
}

public static IPipeReader CreateGZipCompressReader(PipeOptions options, IPipeReader reader, CompressionLevel compressionLevel)
public static PipeReader CreateGZipCompressReader(PipeOptions options, PipeReader reader, CompressionLevel compressionLevel)
{
var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.GZip_DefaultWindowBits);
var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = deflater.Execute(reader, pipe.Writer);
return pipe.Reader;
}
Expand All @@ -99,7 +99,7 @@ public WritableDeflateTransform(CompressionLevel compressionLevel, int bits)
_deflater = new Deflater(compressionLevel, bits);
}

public async Task Execute(IPipeReader reader, IPipeWriter writer)
public async Task Execute(PipeReader reader, PipeWriter writer)
{
List<MemoryHandle> handles = new List<MemoryHandle>();

Expand Down Expand Up @@ -210,7 +210,7 @@ public ReadableDeflateTransform(int bits)
_inflater = new Inflater(bits);
}

public async Task Execute(IPipeReader reader, IPipeWriter writer)
public async Task Execute(PipeReader reader, PipeWriter writer)
{
List<MemoryHandle> handles = new List<MemoryHandle>();

Expand Down
10 changes: 5 additions & 5 deletions src/System.IO.Pipelines.Extensions/PipelineReaderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace System.IO.Pipelines
{
public static class PipelineReaderExtensions
{
public static ValueTask<int> ReadAsync(this IPipeReader input, ArraySegment<byte> destination)
public static ValueTask<int> ReadAsync(this PipeReader input, ArraySegment<byte> destination)
{
while (true)
{
Expand Down Expand Up @@ -44,7 +44,7 @@ public static ValueTask<int> ReadAsync(this IPipeReader input, ArraySegment<byte
return new ValueTask<int>(input.ReadAsyncAwaited(destination));
}

public static async Task CopyToAsync(this IPipeReader input, IPipeWriter output)
public static async Task CopyToAsync(this PipeReader input, PipeWriter output)
{
while (true)
{
Expand Down Expand Up @@ -75,7 +75,7 @@ public static async Task CopyToAsync(this IPipeReader input, IPipeWriter output)
}
}

private static async Task<int> ReadAsyncAwaited(this IPipeReader input, ArraySegment<byte> destination)
private static async Task<int> ReadAsyncAwaited(this PipeReader input, ArraySegment<byte> destination)
{
while (true)
{
Expand All @@ -99,12 +99,12 @@ private static async Task<int> ReadAsyncAwaited(this IPipeReader input, ArraySeg
}
}

public static Task CopyToAsync(this IPipeReader input, Stream stream)
public static Task CopyToAsync(this PipeReader input, Stream stream)
{
return input.CopyToAsync(stream, 4096, CancellationToken.None);
}

public static async Task CopyToAsync(this IPipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken)
public static async Task CopyToAsync(this PipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken)
{
// TODO: Use bufferSize argument
while (!cancellationToken.IsCancellationRequested)
Expand Down
6 changes: 3 additions & 3 deletions src/System.IO.Pipelines.Extensions/ReadWriteExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ static void WriteLittleEndian<[Primitive]T>(this Span<byte> buffer, T value) whe
WriteMachineEndian(buffer, ref value);
}

public static async Task<ReadOnlyBuffer<byte>> ReadToEndAsync(this IPipeReader input)
public static async Task<ReadOnlyBuffer<byte>> ReadToEndAsync(this PipeReader input)
{
while (true)
{
Expand Down Expand Up @@ -158,7 +158,7 @@ private static T ReadMultiLittle<[Primitive]T>(ReadOnlyBuffer<byte> buffer, int
/// Reads a structure of type T out of a buffer of bytes.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteBigEndian<[Primitive]T>(this IPipeWriter buffer, T value) where T : struct
public static void WriteBigEndian<[Primitive]T>(this PipeWriter buffer, T value) where T : struct
{
int len = Unsafe.SizeOf<T>();
buffer.GetMemory(len).Span.WriteBigEndian(value);
Expand All @@ -169,7 +169,7 @@ public static void WriteBigEndian<[Primitive]T>(this IPipeWriter buffer, T value
/// Reads a structure of type T out of a buffer of bytes.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteLittleEndian<[Primitive]T>(this IPipeWriter buffer, T value) where T : struct
public static void WriteLittleEndian<[Primitive]T>(this PipeWriter buffer, T value) where T : struct
{
int len = Unsafe.SizeOf<T>();
buffer.GetMemory(len).Span.WriteLittleEndian(value);
Expand Down
14 changes: 7 additions & 7 deletions src/System.IO.Pipelines.Extensions/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ namespace System.IO.Pipelines
public static class StreamExtensions
{
/// <summary>
/// Copies the content of a <see cref="Stream"/> into a <see cref="IPipeWriter"/>.
/// Copies the content of a <see cref="Stream"/> into a <see cref="PipeWriter"/>.
/// </summary>
/// <param name="stream"></param>
/// <param name="writer"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static Task CopyToAsync(this Stream stream, IPipeWriter writer, CancellationToken cancellationToken = default)
public static Task CopyToAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
{
// 81920 is the default bufferSize, there is not stream.CopyToAsync overload that takes only a cancellationToken
return stream.CopyToAsync(new PipelineWriterStream(writer), bufferSize: 81920, cancellationToken: cancellationToken);
}

public static async Task CopyToEndAsync(this Stream stream, IPipeWriter writer, CancellationToken cancellationToken = default)
public static async Task CopyToEndAsync(this Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
{
try
{
Expand Down Expand Up @@ -77,12 +77,12 @@ await stream.WriteAsync(data.Array, data.Offset, data.Count)
}
}

public static Task CopyToEndAsync(this IPipeReader input, Stream stream)
public static Task CopyToEndAsync(this PipeReader input, Stream stream)
{
return input.CopyToEndAsync(stream, 4096, CancellationToken.None);
}

public static async Task CopyToEndAsync(this IPipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken)
public static async Task CopyToEndAsync(this PipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken)
{
try
{
Expand All @@ -98,9 +98,9 @@ public static async Task CopyToEndAsync(this IPipeReader input, Stream stream, i

private class PipelineWriterStream : Stream
{
private readonly IPipeWriter _writer;
private readonly PipeWriter _writer;

public PipelineWriterStream(IPipeWriter writer)
public PipelineWriterStream(PipeWriter writer)
{
_writer = writer;
}
Expand Down
12 changes: 6 additions & 6 deletions src/System.IO.Pipelines.Extensions/StreamPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,37 @@ public StreamPipeConnection(PipeOptions options, Stream stream)
Output = CreateWriter(options, stream);
}

public IPipeReader Input { get; }
public PipeReader Input { get; }

public IPipeWriter Output { get; }
public PipeWriter Output { get; }

public void Dispose()
{
Input.Complete();
Output.Complete();
}

public static IPipeReader CreateReader(PipeOptions options, Stream stream)
public static PipeReader CreateReader(PipeOptions options, Stream stream)
{
if (!stream.CanRead)
{
throw new NotSupportedException();
}

var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = stream.CopyToEndAsync(pipe.Writer);

return pipe.Reader;
}

public static IPipeWriter CreateWriter(PipeOptions options, Stream stream)
public static PipeWriter CreateWriter(PipeOptions options, Stream stream)
{
if (!stream.CanWrite)
{
throw new NotSupportedException();
}

var pipe = new Pipe(options);
var pipe = new ResetablePipe(options);
var ignore = pipe.Reader.CopyToEndAsync(stream);

return pipe.Writer;
Expand Down
Loading