Skip to content

Commit c903d75

Browse files
committed
cleanup AsyncServletOutputStreamWriter
1 parent e632dde commit c903d75

File tree

2 files changed

+55
-54
lines changed

2 files changed

+55
-54
lines changed

servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java

Lines changed: 53 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package io.grpc.servlet;
1818

19+
import static com.google.common.base.Preconditions.checkState;
1920
import static io.grpc.servlet.ServletServerStream.toHexString;
2021
import static java.util.logging.Level.FINE;
2122
import static java.util.logging.Level.FINEST;
2223

2324
import io.grpc.InternalLogId;
24-
import io.grpc.Status;
2525
import io.grpc.servlet.ServletServerStream.ServletTransportState;
2626
import java.io.IOException;
2727
import java.time.Duration;
@@ -57,9 +57,9 @@ final class AsyncServletOutputStreamWriter {
5757
* </pre>
5858
*
5959
* <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
60-
* application thread (calling {@code runOrBufferActionItem()}) that read and update the
61-
* writeState. Only onWritePossible() may turn readyAndEmpty from false to true, and only
62-
* runOrBufferActionItem() may turn it from true to false.
60+
* application thread (calling {@code runOrBuffer()}) that read and update the
61+
* writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
62+
* only runOrBuffer() may turn it from true to false.
6363
*/
6464
private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
6565

@@ -82,10 +82,9 @@ final class AsyncServletOutputStreamWriter {
8282

8383
AsyncServletOutputStreamWriter(
8484
AsyncContext asyncContext,
85-
ServletOutputStream outputStream,
8685
ServletTransportState transportState,
87-
InternalLogId logId) {
88-
this.outputStream = outputStream;
86+
InternalLogId logId) throws IOException {
87+
this.outputStream = asyncContext.getResponse().getOutputStream();
8988
this.transportState = transportState;
9089
this.logId = logId;
9190
this.flushAction = () -> {
@@ -105,7 +104,7 @@ final class AsyncServletOutputStreamWriter {
105104

106105
/** Called from application thread. */
107106
void writeBytes(byte[] bytes, int numBytes) throws IOException {
108-
runOrBufferActionItem(
107+
runOrBuffer(
109108
// write bytes action
110109
() -> {
111110
outputStream.write(bytes, 0, numBytes);
@@ -121,24 +120,23 @@ void writeBytes(byte[] bytes, int numBytes) throws IOException {
121120

122121
/** Called from application thread. */
123122
void flush() throws IOException {
124-
runOrBufferActionItem(flushAction);
123+
runOrBuffer(flushAction);
125124
}
126125

127126
/** Called from application thread. */
128127
void complete() {
129128
try {
130-
runOrBufferActionItem(completeAction);
131-
} catch (IOException e) {
132-
// actually completeAction does not throw
133-
throw Status.fromThrowable(e).asRuntimeException();
129+
runOrBuffer(completeAction);
130+
} catch (IOException ignore) {
131+
// actually completeAction does not throw IOException
134132
}
135133
}
136134

137135
/** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
138136
void onWritePossible() throws IOException {
139137
logger.log(
140138
FINEST, "[{0}] onWritePossible: ENTRY. The servlet output stream becomes ready", logId);
141-
assureReadyAndEmptyFalse();
139+
assureReadyAndDrainedTurnsFalse();
142140
while (outputStream.isReady()) {
143141
WriteState curState = writeState.get();
144142

@@ -148,7 +146,7 @@ void onWritePossible() throws IOException {
148146
continue;
149147
}
150148

151-
if (writeState.compareAndSet(curState, curState.withReadyAndEmpty(true))) {
149+
if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
152150
// state has not changed since.
153151
logger.log(
154152
FINEST,
@@ -157,47 +155,56 @@ void onWritePossible() throws IOException {
157155
logId);
158156
return;
159157
}
160-
// else, state changed by another thread (runOrBufferActionItem), need to drain the writeChain
158+
// else, state changed by another thread (runOrBuffer()), need to drain the writeChain
161159
// again
162160
}
163161
logger.log(
164162
FINEST, "[{0}] onWritePossible: EXIT. The servlet output stream becomes not ready", logId);
165163
}
166164

167-
private void runOrBufferActionItem(ActionItem actionItem) throws IOException {
165+
private void assureReadyAndDrainedTurnsFalse() {
166+
// readyAndDrained should have been set to false already.
167+
// Just in case due to a race condition readyAndDrained is still true at this moment and is
168+
// being set to false by runOrBuffer() concurrently.
169+
while (writeState.get().readyAndDrained) {
170+
parkingThread = Thread.currentThread();
171+
LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately
172+
}
173+
parkingThread = null;
174+
}
175+
176+
/**
177+
* Either execute the write action directly, or buffer the action and let the container thread
178+
* drain it.
179+
*
180+
* <p>Called from application thread.
181+
*/
182+
private void runOrBuffer(ActionItem actionItem) throws IOException {
168183
WriteState curState = writeState.get();
169-
if (curState.readyAndEmpty) { // write to the outputStream directly
184+
if (curState.readyAndDrained) { // write to the outputStream directly
170185
actionItem.run();
171186
if (!outputStream.isReady()) {
172-
logger.log(FINEST, "[{0}] the servlet output stream becomes not ready", logId);
173-
boolean successful = writeState.compareAndSet(curState, curState.withReadyAndEmpty(false));
174-
assert successful;
187+
boolean successful =
188+
writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
175189
LockSupport.unpark(parkingThread);
190+
checkState(successful, "Bug: curState is unexpectedly changed by another thread");
191+
logger.log(FINEST, "[{0}] the servlet output stream becomes not ready", logId);
176192
}
177193
} else { // buffer to the writeChain
178194
writeChain.offer(actionItem);
179-
if (!writeState.compareAndSet(curState, curState.newItemBuffered())) {
180-
// state changed by another thread (onWritePossible)
181-
assert writeState.get().readyAndEmpty;
195+
if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
196+
checkState(
197+
writeState.get().readyAndDrained,
198+
"Bug: onWritePossible() should have changed readyAndDrained to true, but not");
182199
ActionItem lastItem = writeChain.poll();
183200
if (lastItem != null) {
184-
assert lastItem == actionItem;
185-
runOrBufferActionItem(lastItem);
201+
checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
202+
runOrBuffer(lastItem);
186203
}
187204
} // state has not changed since
188205
}
189206
}
190207

191-
private void assureReadyAndEmptyFalse() {
192-
// readyAndEmpty should have been set to false already or right now
193-
// It's very very unlikely readyAndEmpty is still true due to a race condition
194-
while (writeState.get().readyAndEmpty) {
195-
parkingThread = Thread.currentThread();
196-
LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
197-
}
198-
parkingThread = null;
199-
}
200-
201208
/** Write actions, e.g. writeBytes, flush, complete. */
202209
@FunctionalInterface
203210
private interface ActionItem {
@@ -211,34 +218,28 @@ private static final class WriteState {
211218
/**
212219
* The servlet output stream is ready and the writeChain is empty.
213220
*
214-
* <p>readyAndEmpty turns from false to true when:
221+
* <p>readyAndDrained turns from false to true when:
215222
* {@code onWritePossible()} exits while currently there is no more data to write, but the last
216223
* check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
217224
*
218-
* <p>readyAndEmpty turns from false to true when:
219-
* {@code runOrBufferActionItem()} exits while either the action item is written directly to the
225+
* <p>readyAndDrained turns from true to false when:
226+
* {@code runOrBuffer()} exits while either the action item is written directly to the
220227
* servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
221228
* right after that returns false, or the action item is buffered into the writeChain.
222229
*/
223-
final boolean readyAndEmpty;
230+
final boolean readyAndDrained;
224231

225-
WriteState(boolean readyAndEmpty) {
226-
this.readyAndEmpty = readyAndEmpty;
232+
WriteState(boolean readyAndDrained) {
233+
this.readyAndDrained = readyAndDrained;
227234
}
228235

229236
/**
230-
* Only {@code onWritePossible()} can set readyAndEmpty to true, and only {@code
231-
* runOrBufferActionItem()} can set it to false.
237+
* Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
238+
* runOrBuffer()} can set it to false.
232239
*/
233240
@CheckReturnValue
234-
WriteState withReadyAndEmpty(boolean readyAndEmpty) {
235-
return new WriteState(readyAndEmpty);
236-
}
237-
238-
/** Only {@code runOrBufferActionItem()} can call it, and will set readyAndEmpty to false. */
239-
@CheckReturnValue
240-
WriteState newItemBuffered() {
241-
return new WriteState(false);
241+
WriteState withReadyAndDrained(boolean readyAndDrained) {
242+
return new WriteState(readyAndDrained);
242243
}
243244
}
244245
}

servlet/src/main/java/io/grpc/servlet/ServletServerStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ final class ServletServerStream extends AbstractServerStream {
8080
this.logId = logId;
8181
this.asyncCtx = asyncCtx;
8282
this.resp = (HttpServletResponse) asyncCtx.getResponse();
83-
resp.getOutputStream().setWriteListener(new GrpcWriteListener());
8483
this.writer = new AsyncServletOutputStreamWriter(
85-
asyncCtx, resp.getOutputStream(), transportState, logId);
84+
asyncCtx, transportState, logId);
85+
resp.getOutputStream().setWriteListener(new GrpcWriteListener());
8686
}
8787

8888
@Override

0 commit comments

Comments
 (0)