1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import io.grpc.Codec; 24 import io.grpc.Compressor; 25 import io.grpc.Decompressor; 26 import io.perfmark.Link; 27 import io.perfmark.PerfMark; 28 import io.perfmark.TaskCloseable; 29 import java.io.InputStream; 30 import javax.annotation.concurrent.GuardedBy; 31 32 /** 33 * The stream and stream state as used by the application. Must only be called from the sending 34 * application thread. 35 */ 36 public abstract class AbstractStream implements Stream { 37 /** The framer to use for sending messages. */ framer()38 protected abstract Framer framer(); 39 40 /** 41 * Obtain the transport state corresponding to this stream. Each stream must have its own unique 42 * transport state. 43 */ transportState()44 protected abstract TransportState transportState(); 45 46 @Override optimizeForDirectExecutor()47 public void optimizeForDirectExecutor() { 48 transportState().optimizeForDirectExecutor(); 49 } 50 51 @Override setMessageCompression(boolean enable)52 public final void setMessageCompression(boolean enable) { 53 framer().setMessageCompression(enable); 54 } 55 56 @Override request(int numMessages)57 public final void request(int numMessages) { 58 transportState().requestMessagesFromDeframer(numMessages); 59 } 60 61 @Override writeMessage(InputStream message)62 public final void writeMessage(InputStream message) { 63 checkNotNull(message, "message"); 64 try { 65 if (!framer().isClosed()) { 66 framer().writePayload(message); 67 } 68 } finally { 69 GrpcUtil.closeQuietly(message); 70 } 71 } 72 73 @Override flush()74 public final void flush() { 75 if (!framer().isClosed()) { 76 framer().flush(); 77 } 78 } 79 80 /** 81 * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed 82 * (half closure on client; closure on server). 83 */ endOfMessages()84 protected final void endOfMessages() { 85 framer().close(); 86 } 87 88 @Override setCompressor(Compressor compressor)89 public final void setCompressor(Compressor compressor) { 90 framer().setCompressor(checkNotNull(compressor, "compressor")); 91 } 92 93 @Override isReady()94 public boolean isReady() { 95 return transportState().isReady(); 96 } 97 98 /** 99 * Event handler to be called by the subclass when a number of bytes are being queued for sending 100 * to the remote endpoint. 101 * 102 * @param numBytes the number of bytes being sent. 103 */ onSendingBytes(int numBytes)104 protected final void onSendingBytes(int numBytes) { 105 transportState().onSendingBytes(numBytes); 106 } 107 108 /** 109 * Stream state as used by the transport. This should only be called from the transport thread 110 * (except for private interactions with {@code AbstractStream}). 111 */ 112 public abstract static class TransportState 113 implements ApplicationThreadDeframer.TransportExecutor, MessageDeframer.Listener { 114 /** 115 * The default number of queued bytes for a given stream, below which 116 * {@link StreamListener#onReady()} will be called. 117 */ 118 @VisibleForTesting 119 public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024; 120 121 private Deframer deframer; 122 private final Object onReadyLock = new Object(); 123 private final StatsTraceContext statsTraceCtx; 124 private final TransportTracer transportTracer; 125 private final MessageDeframer rawDeframer; 126 127 /** 128 * The number of bytes currently queued, waiting to be sent. When this falls below 129 * DEFAULT_ONREADY_THRESHOLD, {@link StreamListener#onReady()} will be called. 130 */ 131 @GuardedBy("onReadyLock") 132 private int numSentBytesQueued; 133 /** 134 * Indicates the stream has been created on the connection. This implies that the stream is no 135 * longer limited by MAX_CONCURRENT_STREAMS. 136 */ 137 @GuardedBy("onReadyLock") 138 private boolean allocated; 139 /** 140 * Indicates that the stream no longer exists for the transport. Implies that the application 141 * should be discouraged from sending, because doing so would have no effect. 142 */ 143 @GuardedBy("onReadyLock") 144 private boolean deallocated; 145 TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)146 protected TransportState( 147 int maxMessageSize, 148 StatsTraceContext statsTraceCtx, 149 TransportTracer transportTracer) { 150 this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); 151 this.transportTracer = checkNotNull(transportTracer, "transportTracer"); 152 rawDeframer = new MessageDeframer( 153 this, 154 Codec.Identity.NONE, 155 maxMessageSize, 156 statsTraceCtx, 157 transportTracer); 158 // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break. 159 deframer = rawDeframer; 160 } 161 optimizeForDirectExecutor()162 final void optimizeForDirectExecutor() { 163 rawDeframer.setListener(this); 164 deframer = rawDeframer; 165 } 166 setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor)167 protected void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) { 168 rawDeframer.setFullStreamDecompressor(fullStreamDecompressor); 169 deframer = new ApplicationThreadDeframer(this, this, rawDeframer); 170 } 171 setMaxInboundMessageSize(int maxSize)172 final void setMaxInboundMessageSize(int maxSize) { 173 deframer.setMaxInboundMessageSize(maxSize); 174 } 175 176 /** 177 * Override this method to provide a stream listener. 178 */ listener()179 protected abstract StreamListener listener(); 180 181 @Override messagesAvailable(StreamListener.MessageProducer producer)182 public void messagesAvailable(StreamListener.MessageProducer producer) { 183 listener().messagesAvailable(producer); 184 } 185 186 /** 187 * Closes the deframer and frees any resources. After this method is called, additional calls 188 * will have no effect. 189 * 190 * <p>When {@code stopDelivery} is false, the deframer will wait to close until any already 191 * queued messages have been delivered. 192 * 193 * <p>The deframer will invoke {@link #deframerClosed(boolean)} upon closing. 194 * 195 * @param stopDelivery interrupt pending deliveries and close immediately 196 */ closeDeframer(boolean stopDelivery)197 protected final void closeDeframer(boolean stopDelivery) { 198 if (stopDelivery) { 199 deframer.close(); 200 } else { 201 deframer.closeWhenComplete(); 202 } 203 } 204 205 /** 206 * Called to parse a received frame and attempt delivery of any completed messages. Must be 207 * called from the transport thread. 208 */ deframe(final ReadableBuffer frame)209 protected final void deframe(final ReadableBuffer frame) { 210 try { 211 deframer.deframe(frame); 212 } catch (Throwable t) { 213 deframeFailed(t); 214 } 215 } 216 217 /** 218 * Called to request the given number of messages from the deframer. May be called from any 219 * thread. 220 */ requestMessagesFromDeframer(final int numMessages)221 private void requestMessagesFromDeframer(final int numMessages) { 222 if (deframer instanceof ThreadOptimizedDeframer) { 223 try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) { 224 deframer.request(numMessages); 225 } 226 return; 227 } 228 final Link link = PerfMark.linkOut(); 229 class RequestRunnable implements Runnable { 230 @Override public void run() { 231 try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) { 232 PerfMark.linkIn(link); 233 deframer.request(numMessages); 234 } catch (Throwable t) { 235 deframeFailed(t); 236 } 237 } 238 } 239 240 runOnTransportThread(new RequestRunnable()); 241 } 242 243 /** 244 * Very rarely used. Prefer stream.request() instead of this; this method is only necessary if 245 * a stream is not available. 246 */ 247 @VisibleForTesting requestMessagesFromDeframerForTesting(int numMessages)248 public final void requestMessagesFromDeframerForTesting(int numMessages) { 249 requestMessagesFromDeframer(numMessages); 250 } 251 getStatsTraceContext()252 public final StatsTraceContext getStatsTraceContext() { 253 return statsTraceCtx; 254 } 255 setDecompressor(Decompressor decompressor)256 protected final void setDecompressor(Decompressor decompressor) { 257 deframer.setDecompressor(decompressor); 258 } 259 isReady()260 private boolean isReady() { 261 synchronized (onReadyLock) { 262 return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated; 263 } 264 } 265 266 /** 267 * Event handler to be called by the subclass when the stream's headers have passed any 268 * connection flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link 269 * StreamListener#onReady()} handler if appropriate. This must be called from the transport 270 * thread, since the listener may be called back directly. 271 */ onStreamAllocated()272 protected void onStreamAllocated() { 273 checkState(listener() != null); 274 synchronized (onReadyLock) { 275 checkState(!allocated, "Already allocated"); 276 allocated = true; 277 } 278 notifyIfReady(); 279 } 280 281 /** 282 * Notify that the stream does not exist in a usable state any longer. This causes {@link 283 * AbstractStream#isReady()} to return {@code false} from this point forward. 284 * 285 * <p>This does not generally need to be called explicitly by the transport, as it is handled 286 * implicitly by {@link AbstractClientStream} and {@link AbstractServerStream}. 287 */ onStreamDeallocated()288 protected final void onStreamDeallocated() { 289 synchronized (onReadyLock) { 290 deallocated = true; 291 } 292 } 293 294 /** 295 * Event handler to be called by the subclass when a number of bytes are being queued for 296 * sending to the remote endpoint. 297 * 298 * @param numBytes the number of bytes being sent. 299 */ onSendingBytes(int numBytes)300 private void onSendingBytes(int numBytes) { 301 synchronized (onReadyLock) { 302 numSentBytesQueued += numBytes; 303 } 304 } 305 306 /** 307 * Event handler to be called by the subclass when a number of bytes has been sent to the remote 308 * endpoint. May call back the listener's {@link StreamListener#onReady()} handler if 309 * appropriate. This must be called from the transport thread, since the listener may be called 310 * back directly. 311 * 312 * @param numBytes the number of bytes that were sent. 313 */ onSentBytes(int numBytes)314 public final void onSentBytes(int numBytes) { 315 boolean doNotify; 316 synchronized (onReadyLock) { 317 checkState(allocated, 318 "onStreamAllocated was not called, but it seems the stream is active"); 319 boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD; 320 numSentBytesQueued -= numBytes; 321 boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD; 322 doNotify = !belowThresholdBefore && belowThresholdAfter; 323 } 324 if (doNotify) { 325 notifyIfReady(); 326 } 327 } 328 329 protected TransportTracer getTransportTracer() { 330 return transportTracer; 331 } 332 333 private void notifyIfReady() { 334 boolean doNotify; 335 synchronized (onReadyLock) { 336 doNotify = isReady(); 337 } 338 if (doNotify) { 339 listener().onReady(); 340 } 341 } 342 } 343 } 344