1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.servlet; 18 19 import static com.google.common.base.Preconditions.checkState; 20 import static io.grpc.servlet.ServletServerStream.toHexString; 21 import static java.util.logging.Level.FINE; 22 import static java.util.logging.Level.FINEST; 23 24 import com.google.common.annotations.VisibleForTesting; 25 import io.grpc.InternalLogId; 26 import io.grpc.servlet.ServletServerStream.ServletTransportState; 27 import java.io.IOException; 28 import java.time.Duration; 29 import java.util.Queue; 30 import java.util.concurrent.ConcurrentLinkedQueue; 31 import java.util.concurrent.atomic.AtomicReference; 32 import java.util.concurrent.locks.LockSupport; 33 import java.util.function.BiFunction; 34 import java.util.function.BooleanSupplier; 35 import java.util.logging.Logger; 36 import javax.annotation.CheckReturnValue; 37 import javax.annotation.Nullable; 38 import javax.servlet.AsyncContext; 39 import javax.servlet.ServletOutputStream; 40 41 /** Handles write actions from the container thread and the application thread. */ 42 final class AsyncServletOutputStreamWriter { 43 44 /** 45 * Memory boundary for write actions. 46 * 47 * <pre> 48 * WriteState curState = writeState.get(); // mark a boundary 49 * doSomething(); // do something within the boundary 50 * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary 51 * if (successful) { 52 * // state has not changed since 53 * return; 54 * } else { 55 * // state is changed by another thread while doSomething(), need recompute 56 * } 57 * </pre> 58 * 59 * <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the 60 * application thread (calling {@code runOrBuffer()}) that read and update the 61 * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and 62 * only runOrBuffer() may turn it from true to false. 63 */ 64 private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT); 65 66 private final Log log; 67 private final BiFunction<byte[], Integer, ActionItem> writeAction; 68 private final ActionItem flushAction; 69 private final ActionItem completeAction; 70 private final BooleanSupplier isReady; 71 72 /** 73 * New write actions will be buffered into this queue if the servlet output stream is not ready or 74 * the queue is not drained. 75 */ 76 // SPSC queue would do 77 private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>(); 78 // for a theoretical race condition that onWritePossible() is called immediately after isReady() 79 // returns false and before writeState.compareAndSet() 80 @Nullable 81 private volatile Thread parkingThread; 82 AsyncServletOutputStreamWriter( AsyncContext asyncContext, ServletTransportState transportState, InternalLogId logId)83 AsyncServletOutputStreamWriter( 84 AsyncContext asyncContext, 85 ServletTransportState transportState, 86 InternalLogId logId) throws IOException { 87 Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName()); 88 this.log = new Log() { 89 @Override 90 public void fine(String str, Object... params) { 91 if (logger.isLoggable(FINE)) { 92 logger.log(FINE, "[" + logId + "]" + str, params); 93 } 94 } 95 96 @Override 97 public void finest(String str, Object... params) { 98 if (logger.isLoggable(FINEST)) { 99 logger.log(FINEST, "[" + logId + "] " + str, params); 100 } 101 } 102 }; 103 104 ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream(); 105 this.writeAction = (byte[] bytes, Integer numBytes) -> () -> { 106 outputStream.write(bytes, 0, numBytes); 107 transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes)); 108 log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes)); 109 }; 110 this.flushAction = () -> { 111 log.finest("flushBuffer"); 112 asyncContext.getResponse().flushBuffer(); 113 }; 114 this.completeAction = () -> { 115 log.fine("call is completing"); 116 transportState.runOnTransportThread( 117 () -> { 118 transportState.complete(); 119 asyncContext.complete(); 120 log.fine("call completed"); 121 }); 122 }; 123 this.isReady = () -> outputStream.isReady(); 124 } 125 126 /** 127 * Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can run. 128 * 129 * @param writeAction Provides an {@link ActionItem} to write given bytes with specified length. 130 * @param isReady Indicates whether the writer can write bytes at the moment (asynchronously). 131 */ 132 @VisibleForTesting AsyncServletOutputStreamWriter( BiFunction<byte[], Integer, ActionItem> writeAction, ActionItem flushAction, ActionItem completeAction, BooleanSupplier isReady, Log log)133 AsyncServletOutputStreamWriter( 134 BiFunction<byte[], Integer, ActionItem> writeAction, 135 ActionItem flushAction, 136 ActionItem completeAction, 137 BooleanSupplier isReady, 138 Log log) { 139 this.writeAction = writeAction; 140 this.flushAction = flushAction; 141 this.completeAction = completeAction; 142 this.isReady = isReady; 143 this.log = log; 144 } 145 146 /** Called from application thread. */ writeBytes(byte[] bytes, int numBytes)147 void writeBytes(byte[] bytes, int numBytes) throws IOException { 148 runOrBuffer(writeAction.apply(bytes, numBytes)); 149 } 150 151 /** Called from application thread. */ flush()152 void flush() throws IOException { 153 runOrBuffer(flushAction); 154 } 155 156 /** Called from application thread. */ complete()157 void complete() { 158 try { 159 runOrBuffer(completeAction); 160 } catch (IOException ignore) { 161 // actually completeAction does not throw IOException 162 } 163 } 164 165 /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */ onWritePossible()166 void onWritePossible() throws IOException { 167 log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready"); 168 assureReadyAndDrainedTurnsFalse(); 169 while (isReady.getAsBoolean()) { 170 WriteState curState = writeState.get(); 171 172 ActionItem actionItem = writeChain.poll(); 173 if (actionItem != null) { 174 actionItem.run(); 175 continue; 176 } 177 178 if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) { 179 // state has not changed since. 180 log.finest( 181 "onWritePossible: EXIT. All data available now is sent out and the servlet output" 182 + " stream is still ready"); 183 return; 184 } 185 // else, state changed by another thread (runOrBuffer()), need to drain the writeChain 186 // again 187 } 188 log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready"); 189 } 190 assureReadyAndDrainedTurnsFalse()191 private void assureReadyAndDrainedTurnsFalse() { 192 // readyAndDrained should have been set to false already. 193 // Just in case due to a race condition readyAndDrained is still true at this moment and is 194 // being set to false by runOrBuffer() concurrently. 195 while (writeState.get().readyAndDrained) { 196 parkingThread = Thread.currentThread(); 197 // Try to sleep for an extremely long time to avoid writeState being changed at exactly 198 // the time when sleep time expires (in extreme scenario, such as #9917). 199 LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately 200 } 201 parkingThread = null; 202 } 203 204 /** 205 * Either execute the write action directly, or buffer the action and let the container thread 206 * drain it. 207 * 208 * <p>Called from application thread. 209 */ runOrBuffer(ActionItem actionItem)210 private void runOrBuffer(ActionItem actionItem) throws IOException { 211 WriteState curState = writeState.get(); 212 if (curState.readyAndDrained) { // write to the outputStream directly 213 actionItem.run(); 214 if (actionItem == completeAction) { 215 return; 216 } 217 if (!isReady.getAsBoolean()) { 218 boolean successful = 219 writeState.compareAndSet(curState, curState.withReadyAndDrained(false)); 220 LockSupport.unpark(parkingThread); 221 checkState(successful, "Bug: curState is unexpectedly changed by another thread"); 222 log.finest("the servlet output stream becomes not ready"); 223 } 224 } else { // buffer to the writeChain 225 writeChain.offer(actionItem); 226 if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) { 227 checkState( 228 writeState.get().readyAndDrained, 229 "Bug: onWritePossible() should have changed readyAndDrained to true, but not"); 230 ActionItem lastItem = writeChain.poll(); 231 if (lastItem != null) { 232 checkState(lastItem == actionItem, "Bug: lastItem != actionItem"); 233 runOrBuffer(lastItem); 234 } 235 } // state has not changed since 236 } 237 } 238 239 /** Write actions, e.g. writeBytes, flush, complete. */ 240 @FunctionalInterface 241 @VisibleForTesting 242 interface ActionItem { run()243 void run() throws IOException; 244 } 245 246 @VisibleForTesting // Lincheck test can not run with java.util.logging dependency. 247 interface Log { fine(String str, Object...params)248 default void fine(String str, Object...params) {} 249 finest(String str, Object...params)250 default void finest(String str, Object...params) {} 251 } 252 253 private static final class WriteState { 254 255 static final WriteState DEFAULT = new WriteState(false); 256 257 /** 258 * The servlet output stream is ready and the writeChain is empty. 259 * 260 * <p>readyAndDrained turns from false to true when: 261 * {@code onWritePossible()} exits while currently there is no more data to write, but the last 262 * check of {@link javax.servlet.ServletOutputStream#isReady()} is true. 263 * 264 * <p>readyAndDrained turns from true to false when: 265 * {@code runOrBuffer()} exits while either the action item is written directly to the 266 * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()} 267 * right after that returns false, or the action item is buffered into the writeChain. 268 */ 269 final boolean readyAndDrained; 270 WriteState(boolean readyAndDrained)271 WriteState(boolean readyAndDrained) { 272 this.readyAndDrained = readyAndDrained; 273 } 274 275 /** 276 * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code 277 * runOrBuffer()} can set it to false. 278 */ 279 @CheckReturnValue withReadyAndDrained(boolean readyAndDrained)280 WriteState withReadyAndDrained(boolean readyAndDrained) { 281 return new WriteState(readyAndDrained); 282 } 283 } 284 } 285