Skip to content

Commit 7803e40

Browse files
committed
Code review feedback for transcoding Streams
1 parent 7a1b867 commit 7803e40

File tree

6 files changed

+119
-107
lines changed

6 files changed

+119
-107
lines changed

src/Mvc/Mvc.Core/src/Formatters/SystemTextJsonOutputFormatter.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.IO;
66
using System.Text;
77
using System.Text.Json;
8+
using System.Threading;
89
using System.Threading.Tasks;
910
using Microsoft.AspNetCore.Http;
1011
using Microsoft.AspNetCore.Mvc.Formatters.Json;
@@ -65,13 +66,17 @@ public sealed override async Task WriteResponseBodyAsync(OutputFormatterWriteCon
6566
// the behavior you get when the user does not declare the return type and with Json.Net at least at the top level.
6667
var objectType = context.Object?.GetType() ?? context.ObjectType;
6768
await JsonSerializer.SerializeAsync(writeStream, context.Object, objectType, SerializerOptions);
69+
if (writeStream is TranscodingWriteStream transcodingStream)
70+
{
71+
await transcodingStream.FinalWriteAsync(CancellationToken.None);
72+
}
6873
await writeStream.FlushAsync();
6974
}
7075
finally
7176
{
72-
if (writeStream is TranscodingWriteStream transcoding)
77+
if (writeStream is TranscodingWriteStream transcodingStream)
7378
{
74-
await transcoding.DisposeAsync();
79+
await transcodingStream.DisposeAsync();
7580
}
7681
}
7782
}

src/Mvc/Mvc.Core/src/Formatters/TranscodingReadStream.cs

Lines changed: 49 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@
66
using System.Diagnostics;
77
using System.IO;
88
using System.Text;
9+
using System.Text.Unicode;
910
using System.Threading;
1011
using System.Threading.Tasks;
1112

1213
namespace Microsoft.AspNetCore.Mvc.Formatters.Json
1314
{
1415
internal sealed class TranscodingReadStream : Stream
1516
{
17+
private const int OverflowBufferSize = 4; // The most number of bytes used to represent a single UTF char
18+
1619
internal const int MaxByteBufferSize = 4096;
1720
internal const int MaxCharBufferSize = 3 * MaxByteBufferSize;
18-
private static readonly int MaxByteCountForUTF8Char = Encoding.UTF8.GetMaxByteCount(charCount: 1);
1921

2022
private readonly Stream _stream;
21-
private readonly Encoder _encoder;
2223
private readonly Decoder _decoder;
2324

2425
private ArraySegment<byte> _byteBuffer;
@@ -48,19 +49,23 @@ public TranscodingReadStream(Stream input, Encoding sourceEncoding)
4849
count: 0);
4950

5051
_overflowBuffer = new ArraySegment<byte>(
51-
ArrayPool<byte>.Shared.Rent(MaxByteCountForUTF8Char),
52+
ArrayPool<byte>.Shared.Rent(OverflowBufferSize),
5253
0,
5354
count: 0);
5455

55-
_encoder = Encoding.UTF8.GetEncoder();
5656
_decoder = sourceEncoding.GetDecoder();
5757
}
5858

5959
public override bool CanRead => true;
6060
public override bool CanSeek => false;
6161
public override bool CanWrite => false;
6262
public override long Length => throw new NotSupportedException();
63-
public override long Position { get; set; }
63+
64+
public override long Position
65+
{
66+
get => throw new NotSupportedException();
67+
set => throw new NotSupportedException();
68+
}
6469

6570
internal int ByteBufferCount => _byteBuffer.Count;
6671
internal int CharBufferCount => _charBuffer.Count;
@@ -76,6 +81,11 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
7681
{
7782
ThrowArgumentOutOfRangeException(buffer, offset, count);
7883

84+
if (count == 0)
85+
{
86+
return 0;
87+
}
88+
7989
var readBuffer = new ArraySegment<byte>(buffer, offset, count);
8090

8191
if (_overflowBuffer.Count > 0)
@@ -90,76 +100,50 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
90100
return bytesToCopy;
91101
}
92102

93-
var totalBytes = 0;
94-
bool encoderCompleted;
95-
int bytesEncoded;
103+
if (_charBuffer.Count == 0)
104+
{
105+
// Only read more content from the input stream if we have exhausted all the buffered chars.
106+
await ReadInputChars(cancellationToken);
107+
}
108+
109+
var operationStatus = Utf8.FromUtf16(_charBuffer, readBuffer, out var charsRead, out var bytesWritten, isFinalBlock: false);
110+
_charBuffer = _charBuffer.Slice(charsRead);
96111

97-
do
112+
switch (operationStatus)
98113
{
99-
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content in to
100-
// the segment that follows.
101-
var eof = false;
102-
if (_charBuffer.Count == 0)
103-
{
104-
// Only read more content from the input stream if we have exhausted all the buffered chars.
105-
eof = await ReadInputChars(cancellationToken);
106-
}
107-
108-
// We need to flush on the last write. This is true when we exhaust the input Stream and any buffered content.
109-
var allContentRead = eof && _charBuffer.Count == 0 && _byteBuffer.Count == 0;
110-
111-
if (_charBuffer.Count > 0 && readBuffer.Count < MaxByteCountForUTF8Char && readBuffer.Count < Encoding.UTF8.GetByteCount(_charBuffer.AsSpan(0, 1)))
112-
{
113-
// It's possible that the passed in buffer is smaller than the size required to encode a single
114-
// char. For instance, the JsonSerializer may pass in a buffer of size 1 or 2 which
115-
// is insufficient if the character requires more than 2 bytes to represent. In this case, read
116-
// content in to an overflow buffer and fill up the passed in buffer.
117-
_encoder.Convert(
118-
_charBuffer,
119-
_overflowBuffer.Array,
120-
flush: false,
121-
out var charsUsed,
122-
out var bytesUsed,
123-
out _);
114+
case OperationStatus.Done:
115+
return bytesWritten;
116+
117+
case OperationStatus.DestinationTooSmall:
118+
if (bytesWritten != 0)
119+
{
120+
return bytesWritten;
121+
}
124122

125-
_charBuffer = _charBuffer.Slice(charsUsed);
123+
// Overflow buffer is always empty when we get here and we can use it's full length to write contents to.
124+
Utf8.FromUtf16(_charBuffer, _overflowBuffer.Array, out var overFlowChars, out var overflowBytes, isFinalBlock: false);
125+
Debug.Assert(overflowBytes > 0 && overFlowChars > 0, "We expect writes to the overflow buffer to always succeed since it is large enough to accomodate at least one char.");
126+
Debug.Assert(readBuffer.Count < overflowBytes);
126127

127-
Debug.Assert(readBuffer.Count < bytesUsed);
128+
_charBuffer = _charBuffer.Slice(overFlowChars);
128129
_overflowBuffer.Array.AsSpan(0, readBuffer.Count).CopyTo(readBuffer);
129130

130131
_overflowBuffer = new ArraySegment<byte>(
131132
_overflowBuffer.Array,
132133
readBuffer.Count,
133-
bytesUsed - readBuffer.Count);
134-
135-
totalBytes += readBuffer.Count;
136-
// At this point we're done writing.
137-
break;
138-
}
139-
else
140-
{
141-
_encoder.Convert(
142-
_charBuffer,
143-
readBuffer,
144-
flush: allContentRead,
145-
out var charsUsed,
146-
out bytesEncoded,
147-
out encoderCompleted);
148-
149-
totalBytes += bytesEncoded;
150-
_charBuffer = _charBuffer.Slice(charsUsed);
151-
readBuffer = readBuffer.Slice(bytesEncoded);
152-
}
153-
154-
// We need to exit in one of the 2 conditions:
155-
// * encoderCompleted will return false if "buffer" was too small for all the chars to be encoded.
156-
// * no bytes were converted in an iteration. This can occur if there wasn't any input.
157-
} while (encoderCompleted && bytesEncoded > 0);
158-
159-
return totalBytes;
134+
overflowBytes - readBuffer.Count);
135+
136+
Debug.Assert(_overflowBuffer.Count != 0);
137+
138+
return readBuffer.Count;
139+
140+
default:
141+
Debug.Fail("We should never see this");
142+
throw new InvalidOperationException();
143+
}
160144
}
161145

162-
private async ValueTask<bool> ReadInputChars(CancellationToken cancellationToken)
146+
private async Task ReadInputChars(CancellationToken cancellationToken)
163147
{
164148
// If we had left-over bytes from a previous read, move it to the start of the buffer and read content in to
165149
// the segment that follows.
@@ -184,15 +168,12 @@ private async ValueTask<bool> ReadInputChars(CancellationToken cancellationToken
184168
out _);
185169

186170
_byteBuffer = _byteBuffer.Slice(bytesUsed);
187-
188171
_charBuffer = new ArraySegment<char>(_charBuffer.Array, 0, charsUsed);
189-
190-
return readBytes == 0;
191172
}
192173

193174
private static void ThrowArgumentOutOfRangeException(byte[] buffer, int offset, int count)
194175
{
195-
if (count <= 0)
176+
if (count < 0)
196177
{
197178
throw new ArgumentOutOfRangeException(nameof(count));
198179
}

src/Mvc/Mvc.Core/src/Formatters/TranscodingWriteStream.cs

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public override void Flush()
4949

5050
public override async Task FlushAsync(CancellationToken cancellationToken)
5151
{
52-
await WriteAsync(ArraySegment<byte>.Empty, flush: true, cancellationToken);
5352
await _stream.FlushAsync(cancellationToken);
5453
}
5554

@@ -73,12 +72,11 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
7372
{
7473
ThrowArgumentException(buffer, offset, count);
7574
var bufferSegment = new ArraySegment<byte>(buffer, offset, count);
76-
return WriteAsync(bufferSegment, flush: false, cancellationToken);
75+
return WriteAsync(bufferSegment, cancellationToken);
7776
}
7877

7978
private async Task WriteAsync(
8079
ArraySegment<byte> bufferSegment,
81-
bool flush,
8280
CancellationToken cancellationToken)
8381
{
8482
var decoderCompleted = false;
@@ -87,50 +85,43 @@ private async Task WriteAsync(
8785
_decoder.Convert(
8886
bufferSegment,
8987
_charBuffer.AsSpan(_charsDecoded),
90-
flush,
88+
flush: false,
9189
out var bytesDecoded,
9290
out var charsDecoded,
9391
out decoderCompleted);
9492

9593
_charsDecoded += charsDecoded;
9694
bufferSegment = bufferSegment.Slice(bytesDecoded);
9795

98-
if (flush || !decoderCompleted)
96+
if (!decoderCompleted)
9997
{
100-
// This is being invoked from FlushAsync or the char buffer is not large enough
101-
// to accomodate all writes.
102-
await WriteBufferAsync(flush, cancellationToken);
98+
await WriteBufferAsync(cancellationToken);
10399
}
104100
}
105101
}
106102

107-
private async Task WriteBufferAsync(bool flush, CancellationToken cancellationToken)
103+
private async Task WriteBufferAsync(CancellationToken cancellationToken)
108104
{
109-
var encoderCompletd = false;
105+
var encoderCompleted = false;
110106
var charsWritten = 0;
111107
var byteBuffer = ArrayPool<byte>.Shared.Rent(_maxByteBufferSize);
112108

113-
try
109+
while (!encoderCompleted && charsWritten < _charsDecoded)
114110
{
115-
while (!encoderCompletd && charsWritten < _charsDecoded)
116-
{
117-
_encoder.Convert(
118-
_charBuffer.AsSpan(charsWritten, _charsDecoded - charsWritten),
119-
byteBuffer,
120-
flush,
121-
out var charsEncoded,
122-
out var bytesUsed,
123-
out encoderCompletd);
124-
125-
await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
126-
charsWritten += charsEncoded;
127-
}
128-
}
129-
finally
130-
{
131-
ArrayPool<byte>.Shared.Return(byteBuffer);
111+
_encoder.Convert(
112+
_charBuffer.AsSpan(charsWritten, _charsDecoded - charsWritten),
113+
byteBuffer,
114+
flush: false,
115+
out var charsEncoded,
116+
out var bytesUsed,
117+
out encoderCompleted);
118+
119+
await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
120+
charsWritten += charsEncoded;
132121
}
133122

123+
ArrayPool<byte>.Shared.Return(byteBuffer);
124+
134125
// At this point, we've written all the buffered chars to the underlying Stream.
135126
_charsDecoded = 0;
136127
}
@@ -161,5 +152,30 @@ protected override void Dispose(bool disposing)
161152
ArrayPool<char>.Shared.Return(_charBuffer);
162153
}
163154
}
155+
156+
public async Task FinalWriteAsync(CancellationToken cancellationToken)
157+
{
158+
// First write any buffered content
159+
await WriteBufferAsync(cancellationToken);
160+
161+
// Now flush the encoder.
162+
var byteBuffer = ArrayPool<byte>.Shared.Rent(_maxByteBufferSize);
163+
var encoderCompleted = false;
164+
165+
while (!encoderCompleted)
166+
{
167+
_encoder.Convert(
168+
Array.Empty<char>(),
169+
byteBuffer,
170+
flush: true,
171+
out _,
172+
out var bytesUsed,
173+
out encoderCompleted);
174+
175+
await _stream.WriteAsync(byteBuffer.AsMemory(0, bytesUsed), cancellationToken);
176+
}
177+
178+
ArrayPool<byte>.Shared.Return(byteBuffer);
179+
}
164180
}
165181
}

src/Mvc/Mvc.Core/src/Infrastructure/SystemTextJsonResultExecutor.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.IO;
77
using System.Text;
88
using System.Text.Json;
9+
using System.Threading;
910
using System.Threading.Tasks;
1011
using Microsoft.AspNetCore.Http;
1112
using Microsoft.AspNetCore.Mvc.Core;
@@ -83,13 +84,17 @@ public async Task ExecuteAsync(ActionContext context, JsonResult result)
8384

8485
var type = value?.GetType() ?? typeof(object);
8586
await JsonSerializer.SerializeAsync(writeStream, value, type, jsonSerializerOptions);
87+
if (writeStream is TranscodingWriteStream transcodingStream)
88+
{
89+
await transcodingStream.FinalWriteAsync(CancellationToken.None);
90+
}
8691
await writeStream.FlushAsync();
8792
}
8893
finally
8994
{
90-
if (writeStream is TranscodingWriteStream transcoding)
95+
if (writeStream is TranscodingWriteStream transcodingStream)
9196
{
92-
await transcoding.DisposeAsync();
97+
await transcodingStream.DisposeAsync();
9398
}
9499
}
95100
}

0 commit comments

Comments
 (0)