Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/Mvc/Mvc.Core/src/Formatters/SystemTextJsonOutputFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Formatters.Json;
Expand Down Expand Up @@ -65,13 +66,21 @@ public sealed override async Task WriteResponseBodyAsync(OutputFormatterWriteCon
// the behavior you get when the user does not declare the return type and with Json.Net at least at the top level.
var objectType = context.Object?.GetType() ?? context.ObjectType;
await JsonSerializer.SerializeAsync(writeStream, context.Object, objectType, SerializerOptions);

// The transcoding streams use Encoders and Decoders that have internal buffers. We need to flush these
// when there is no more data to be written. Stream.FlushAsync isn't suitable since it's
// acceptable to Flush a Stream (multiple times) prior to completion.
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcodingStream.FinalWriteAsync(CancellationToken.None);
}
await writeStream.FlushAsync();
}
finally
{
if (writeStream is TranscodingWriteStream transcoding)
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcoding.DisposeAsync();
await transcodingStream.DisposeAsync();
}
}
}
Expand Down
122 changes: 54 additions & 68 deletions src/Mvc/Mvc.Core/src/Formatters/TranscodingReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Text.Unicode;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.AspNetCore.Mvc.Formatters.Json
{
internal sealed class TranscodingReadStream : Stream
{
private static readonly int OverflowBufferSize = Encoding.UTF8.GetMaxByteCount(1); // The most number of bytes used to represent a single UTF char

internal const int MaxByteBufferSize = 4096;
internal const int MaxCharBufferSize = 3 * MaxByteBufferSize;
private static readonly int MaxByteCountForUTF8Char = Encoding.UTF8.GetMaxByteCount(charCount: 1);

private readonly Stream _stream;
private readonly Encoder _encoder;
private readonly Decoder _decoder;

private ArraySegment<byte> _byteBuffer;
Expand Down Expand Up @@ -48,19 +49,23 @@ public TranscodingReadStream(Stream input, Encoding sourceEncoding)
count: 0);

_overflowBuffer = new ArraySegment<byte>(
ArrayPool<byte>.Shared.Rent(MaxByteCountForUTF8Char),
ArrayPool<byte>.Shared.Rent(OverflowBufferSize),
0,
count: 0);

_encoder = Encoding.UTF8.GetEncoder();
_decoder = sourceEncoding.GetDecoder();
}

public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position { get; set; }

public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

internal int ByteBufferCount => _byteBuffer.Count;
internal int CharBufferCount => _charBuffer.Count;
Expand All @@ -76,6 +81,11 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
{
ThrowArgumentOutOfRangeException(buffer, offset, count);

if (count == 0)
{
return 0;
}

var readBuffer = new ArraySegment<byte>(buffer, offset, count);

if (_overflowBuffer.Count > 0)
Expand All @@ -90,76 +100,55 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
return bytesToCopy;
}

var totalBytes = 0;
bool encoderCompleted;
int bytesEncoded;
if (_charBuffer.Count == 0)
{
// Only read more content from the input stream if we have exhausted all the buffered chars.
await ReadInputChars(cancellationToken);
}

var operationStatus = Utf8.FromUtf16(_charBuffer, readBuffer, out var charsRead, out var bytesWritten, isFinalBlock: false);
_charBuffer = _charBuffer.Slice(charsRead);

do
switch (operationStatus)
{
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content in to
// the segment that follows.
var eof = false;
if (_charBuffer.Count == 0)
{
// Only read more content from the input stream if we have exhausted all the buffered chars.
eof = await ReadInputChars(cancellationToken);
}

// We need to flush on the last write. This is true when we exhaust the input Stream and any buffered content.
var allContentRead = eof && _charBuffer.Count == 0 && _byteBuffer.Count == 0;

if (_charBuffer.Count > 0 && readBuffer.Count < MaxByteCountForUTF8Char && readBuffer.Count < Encoding.UTF8.GetByteCount(_charBuffer.AsSpan(0, 1)))
{
// It's possible that the passed in buffer is smaller than the size required to encode a single
// char. For instance, the JsonSerializer may pass in a buffer of size 1 or 2 which
// is insufficient if the character requires more than 2 bytes to represent. In this case, read
// content in to an overflow buffer and fill up the passed in buffer.
_encoder.Convert(
_charBuffer,
_overflowBuffer.Array,
flush: false,
out var charsUsed,
out var bytesUsed,
out _);
case OperationStatus.Done:
return bytesWritten;

case OperationStatus.DestinationTooSmall:
if (bytesWritten != 0)
{
return bytesWritten;
}

// Overflow buffer is always empty when we get here and we can use it's full length to write contents to.
Utf8.FromUtf16(_charBuffer, _overflowBuffer.Array, out var overFlowChars, out var overflowBytes, isFinalBlock: false);

_charBuffer = _charBuffer.Slice(charsUsed);
Debug.Assert(overflowBytes > 0 && overFlowChars > 0, "We expect writes to the overflow buffer to always succeed since it is large enough to accomodate at least one char.");

Debug.Assert(readBuffer.Count < bytesUsed);
_charBuffer = _charBuffer.Slice(overFlowChars);

// readBuffer: [ 0, 0, ], overflowBuffer: [ 7, 13, 34, ]
// Fill up the readBuffer to capacity, so the result looks like so:
// readBuffer: [ 7, 13 ], overflowBuffer: [ 34 ]
Debug.Assert(readBuffer.Count < overflowBytes);
_overflowBuffer.Array.AsSpan(0, readBuffer.Count).CopyTo(readBuffer);

_overflowBuffer = new ArraySegment<byte>(
_overflowBuffer.Array,
readBuffer.Count,
bytesUsed - readBuffer.Count);

totalBytes += readBuffer.Count;
// At this point we're done writing.
break;
}
else
{
_encoder.Convert(
_charBuffer,
readBuffer,
flush: allContentRead,
out var charsUsed,
out bytesEncoded,
out encoderCompleted);

totalBytes += bytesEncoded;
_charBuffer = _charBuffer.Slice(charsUsed);
readBuffer = readBuffer.Slice(bytesEncoded);
}

// We need to exit in one of the 2 conditions:
// * encoderCompleted will return false if "buffer" was too small for all the chars to be encoded.
// * no bytes were converted in an iteration. This can occur if there wasn't any input.
} while (encoderCompleted && bytesEncoded > 0);

return totalBytes;
overflowBytes - readBuffer.Count);

Debug.Assert(_overflowBuffer.Count != 0);

return readBuffer.Count;

default:
Debug.Fail("We should never see this");
throw new InvalidOperationException();
}
}

private async ValueTask<bool> ReadInputChars(CancellationToken cancellationToken)
private async Task ReadInputChars(CancellationToken cancellationToken)
{
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content in to
// the segment that follows.
Expand All @@ -184,15 +173,12 @@ private async ValueTask<bool> ReadInputChars(CancellationToken cancellationToken
out _);

_byteBuffer = _byteBuffer.Slice(bytesUsed);

_charBuffer = new ArraySegment<char>(_charBuffer.Array, 0, charsUsed);

return readBytes == 0;
}

private static void ThrowArgumentOutOfRangeException(byte[] buffer, int offset, int count)
{
if (count <= 0)
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}
Expand Down
72 changes: 44 additions & 28 deletions src/Mvc/Mvc.Core/src/Formatters/TranscodingWriteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public override void Flush()

public override async Task FlushAsync(CancellationToken cancellationToken)
{
await WriteAsync(ArraySegment<byte>.Empty, flush: true, cancellationToken);
await _stream.FlushAsync(cancellationToken);
}

Expand All @@ -73,12 +72,11 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
{
ThrowArgumentException(buffer, offset, count);
var bufferSegment = new ArraySegment<byte>(buffer, offset, count);
return WriteAsync(bufferSegment, flush: false, cancellationToken);
return WriteAsync(bufferSegment, cancellationToken);
}

private async Task WriteAsync(
ArraySegment<byte> bufferSegment,
bool flush,
CancellationToken cancellationToken)
{
var decoderCompleted = false;
Expand All @@ -87,50 +85,43 @@ private async Task WriteAsync(
_decoder.Convert(
bufferSegment,
_charBuffer.AsSpan(_charsDecoded),
flush,
flush: false,
out var bytesDecoded,
out var charsDecoded,
out decoderCompleted);

_charsDecoded += charsDecoded;
bufferSegment = bufferSegment.Slice(bytesDecoded);

if (flush || !decoderCompleted)
if (!decoderCompleted)
{
// This is being invoked from FlushAsync or the char buffer is not large enough
// to accomodate all writes.
await WriteBufferAsync(flush, cancellationToken);
await WriteBufferAsync(cancellationToken);
}
}
}

private async Task WriteBufferAsync(bool flush, CancellationToken cancellationToken)
private async Task WriteBufferAsync(CancellationToken cancellationToken)
{
var encoderCompletd = false;
var encoderCompleted = false;
var charsWritten = 0;
var byteBuffer = ArrayPool<byte>.Shared.Rent(_maxByteBufferSize);

try
while (!encoderCompleted && charsWritten < _charsDecoded)
{
while (!encoderCompletd && charsWritten < _charsDecoded)
{
_encoder.Convert(
_charBuffer.AsSpan(charsWritten, _charsDecoded - charsWritten),
byteBuffer,
flush,
out var charsEncoded,
out var bytesUsed,
out encoderCompletd);

await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
charsWritten += charsEncoded;
}
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
_encoder.Convert(
_charBuffer.AsSpan(charsWritten, _charsDecoded - charsWritten),
byteBuffer,
flush: false,
out var charsEncoded,
out var bytesUsed,
out encoderCompleted);

await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
charsWritten += charsEncoded;
}

ArrayPool<byte>.Shared.Return(byteBuffer);

// At this point, we've written all the buffered chars to the underlying Stream.
_charsDecoded = 0;
}
Expand Down Expand Up @@ -161,5 +152,30 @@ protected override void Dispose(bool disposing)
ArrayPool<char>.Shared.Return(_charBuffer);
}
}

public async Task FinalWriteAsync(CancellationToken cancellationToken)
{
// First write any buffered content
await WriteBufferAsync(cancellationToken);

// Now flush the encoder.
var byteBuffer = ArrayPool<byte>.Shared.Rent(_maxByteBufferSize);
var encoderCompleted = false;

while (!encoderCompleted)
{
_encoder.Convert(
Array.Empty<char>(),
byteBuffer,
flush: true,
out _,
out var bytesUsed,
out encoderCompleted);

await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
}

ArrayPool<byte>.Shared.Return(byteBuffer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Core;
Expand Down Expand Up @@ -83,13 +84,21 @@ public async Task ExecuteAsync(ActionContext context, JsonResult result)

var type = value?.GetType() ?? typeof(object);
await JsonSerializer.SerializeAsync(writeStream, value, type, jsonSerializerOptions);

// The transcoding streams use Encoders and Decoders that have internal buffers. We need to flush these
// when there is no more data to be written. Stream.FlushAsync isn't suitable since it's
// acceptable to Flush a Stream (multiple times) prior to completion.
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcodingStream.FinalWriteAsync(CancellationToken.None);
}
await writeStream.FlushAsync();
}
finally
{
if (writeStream is TranscodingWriteStream transcoding)
if (writeStream is TranscodingWriteStream transcodingStream)
{
await transcoding.DisposeAsync();
await transcodingStream.DisposeAsync();
}
}
}
Expand Down
Loading