• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.servlet;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 import static io.grpc.servlet.ServletServerStream.toHexString;
21 import static java.util.logging.Level.FINE;
22 import static java.util.logging.Level.FINEST;
23 
24 import com.google.common.annotations.VisibleForTesting;
25 import io.grpc.InternalLogId;
26 import io.grpc.servlet.ServletServerStream.ServletTransportState;
27 import java.io.IOException;
28 import java.time.Duration;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.atomic.AtomicReference;
32 import java.util.concurrent.locks.LockSupport;
33 import java.util.function.BiFunction;
34 import java.util.function.BooleanSupplier;
35 import java.util.logging.Logger;
36 import javax.annotation.CheckReturnValue;
37 import javax.annotation.Nullable;
38 import javax.servlet.AsyncContext;
39 import javax.servlet.ServletOutputStream;
40 
41 /** Handles write actions from the container thread and the application thread. */
42 final class AsyncServletOutputStreamWriter {
43 
44   /**
45    * Memory boundary for write actions.
46    *
47    * <pre>
48    * WriteState curState = writeState.get();  // mark a boundary
49    * doSomething();  // do something within the boundary
50    * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
51    * if (successful) {
52    *   // state has not changed since
53    *   return;
54    * } else {
55    *   // state is changed by another thread while doSomething(), need recompute
56    * }
57    * </pre>
58    *
59    * <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
60    * application thread (calling {@code runOrBuffer()}) that read and update the
61    * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
62    * only runOrBuffer() may turn it from true to false.
63    */
64   private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
65 
66   private final Log log;
67   private final BiFunction<byte[], Integer, ActionItem> writeAction;
68   private final ActionItem flushAction;
69   private final ActionItem completeAction;
70   private final BooleanSupplier isReady;
71 
72   /**
73    * New write actions will be buffered into this queue if the servlet output stream is not ready or
74    * the queue is not drained.
75    */
76   // SPSC queue would do
77   private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
78   // for a theoretical race condition that onWritePossible() is called immediately after isReady()
79   // returns false and before writeState.compareAndSet()
80   @Nullable
81   private volatile Thread parkingThread;
82 
AsyncServletOutputStreamWriter( AsyncContext asyncContext, ServletTransportState transportState, InternalLogId logId)83   AsyncServletOutputStreamWriter(
84       AsyncContext asyncContext,
85       ServletTransportState transportState,
86       InternalLogId logId) throws IOException {
87     Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
88     this.log = new Log() {
89       @Override
90       public void fine(String str, Object... params) {
91         if (logger.isLoggable(FINE)) {
92           logger.log(FINE, "[" + logId + "]" + str, params);
93         }
94       }
95 
96       @Override
97       public void finest(String str, Object... params) {
98         if (logger.isLoggable(FINEST)) {
99           logger.log(FINEST, "[" + logId + "] " + str, params);
100         }
101       }
102     };
103 
104     ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream();
105     this.writeAction = (byte[] bytes, Integer numBytes) -> () -> {
106       outputStream.write(bytes, 0, numBytes);
107       transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
108       log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes));
109     };
110     this.flushAction = () -> {
111       log.finest("flushBuffer");
112       asyncContext.getResponse().flushBuffer();
113     };
114     this.completeAction = () -> {
115       log.fine("call is completing");
116       transportState.runOnTransportThread(
117           () -> {
118             transportState.complete();
119             asyncContext.complete();
120             log.fine("call completed");
121           });
122     };
123     this.isReady = () -> outputStream.isReady();
124   }
125 
126   /**
127    * Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can run.
128    *
129    * @param writeAction Provides an {@link ActionItem} to write given bytes with specified length.
130    * @param isReady Indicates whether the writer can write bytes at the moment (asynchronously).
131    */
132   @VisibleForTesting
AsyncServletOutputStreamWriter( BiFunction<byte[], Integer, ActionItem> writeAction, ActionItem flushAction, ActionItem completeAction, BooleanSupplier isReady, Log log)133   AsyncServletOutputStreamWriter(
134       BiFunction<byte[], Integer, ActionItem> writeAction,
135       ActionItem flushAction,
136       ActionItem completeAction,
137       BooleanSupplier isReady,
138       Log log) {
139     this.writeAction = writeAction;
140     this.flushAction = flushAction;
141     this.completeAction = completeAction;
142     this.isReady = isReady;
143     this.log = log;
144   }
145 
146   /** Called from application thread. */
writeBytes(byte[] bytes, int numBytes)147   void writeBytes(byte[] bytes, int numBytes) throws IOException {
148     runOrBuffer(writeAction.apply(bytes, numBytes));
149   }
150 
151   /** Called from application thread. */
flush()152   void flush() throws IOException {
153     runOrBuffer(flushAction);
154   }
155 
156   /** Called from application thread. */
complete()157   void complete() {
158     try {
159       runOrBuffer(completeAction);
160     } catch (IOException ignore) {
161       // actually completeAction does not throw IOException
162     }
163   }
164 
165   /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
onWritePossible()166   void onWritePossible() throws IOException {
167     log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
168     assureReadyAndDrainedTurnsFalse();
169     while (isReady.getAsBoolean()) {
170       WriteState curState = writeState.get();
171 
172       ActionItem actionItem = writeChain.poll();
173       if (actionItem != null) {
174         actionItem.run();
175         continue;
176       }
177 
178       if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
179         // state has not changed since.
180         log.finest(
181             "onWritePossible: EXIT. All data available now is sent out and the servlet output"
182                 + " stream is still ready");
183         return;
184       }
185       // else, state changed by another thread (runOrBuffer()), need to drain the writeChain
186       // again
187     }
188     log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
189   }
190 
assureReadyAndDrainedTurnsFalse()191   private void assureReadyAndDrainedTurnsFalse() {
192     // readyAndDrained should have been set to false already.
193     // Just in case due to a race condition readyAndDrained is still true at this moment and is
194     // being set to false by runOrBuffer() concurrently.
195     while (writeState.get().readyAndDrained) {
196       parkingThread = Thread.currentThread();
197       // Try to sleep for an extremely long time to avoid writeState being changed at exactly
198       // the time when sleep time expires (in extreme scenario, such as #9917).
199       LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately
200     }
201     parkingThread = null;
202   }
203 
204   /**
205    * Either execute the write action directly, or buffer the action and let the container thread
206    * drain it.
207    *
208    * <p>Called from application thread.
209    */
runOrBuffer(ActionItem actionItem)210   private void runOrBuffer(ActionItem actionItem) throws IOException {
211     WriteState curState = writeState.get();
212     if (curState.readyAndDrained) { // write to the outputStream directly
213       actionItem.run();
214       if (actionItem == completeAction) {
215         return;
216       }
217       if (!isReady.getAsBoolean()) {
218         boolean successful =
219             writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
220         LockSupport.unpark(parkingThread);
221         checkState(successful, "Bug: curState is unexpectedly changed by another thread");
222         log.finest("the servlet output stream becomes not ready");
223       }
224     } else { // buffer to the writeChain
225       writeChain.offer(actionItem);
226       if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
227         checkState(
228             writeState.get().readyAndDrained,
229             "Bug: onWritePossible() should have changed readyAndDrained to true, but not");
230         ActionItem lastItem = writeChain.poll();
231         if (lastItem != null) {
232           checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
233           runOrBuffer(lastItem);
234         }
235       } // state has not changed since
236     }
237   }
238 
239   /** Write actions, e.g. writeBytes, flush, complete. */
240   @FunctionalInterface
241   @VisibleForTesting
242   interface ActionItem {
run()243     void run() throws IOException;
244   }
245 
246   @VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
247   interface Log {
fine(String str, Object...params)248     default void fine(String str, Object...params) {}
249 
finest(String str, Object...params)250     default void finest(String str, Object...params) {}
251   }
252 
253   private static final class WriteState {
254 
255     static final WriteState DEFAULT = new WriteState(false);
256 
257     /**
258      * The servlet output stream is ready and the writeChain is empty.
259      *
260      * <p>readyAndDrained turns from false to true when:
261      * {@code onWritePossible()} exits while currently there is no more data to write, but the last
262      * check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
263      *
264      * <p>readyAndDrained turns from true to false when:
265      * {@code runOrBuffer()} exits while either the action item is written directly to the
266      * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
267      * right after that returns false, or the action item is buffered into the writeChain.
268      */
269     final boolean readyAndDrained;
270 
WriteState(boolean readyAndDrained)271     WriteState(boolean readyAndDrained) {
272       this.readyAndDrained = readyAndDrained;
273     }
274 
275     /**
276      * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
277      * runOrBuffer()} can set it to false.
278      */
279     @CheckReturnValue
withReadyAndDrained(boolean readyAndDrained)280     WriteState withReadyAndDrained(boolean readyAndDrained) {
281       return new WriteState(readyAndDrained);
282     }
283   }
284 }
285