1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import io.grpc.Codec; 24 import io.grpc.Compressor; 25 import io.grpc.Decompressor; 26 import java.io.InputStream; 27 import javax.annotation.concurrent.GuardedBy; 28 29 /** 30 * The stream and stream state as used by the application. Must only be called from the sending 31 * application thread. 32 */ 33 public abstract class AbstractStream implements Stream { 34 /** The framer to use for sending messages. */ framer()35 protected abstract Framer framer(); 36 37 /** 38 * Obtain the transport state corresponding to this stream. Each stream must have its own unique 39 * transport state. 40 */ transportState()41 protected abstract TransportState transportState(); 42 43 @Override setMessageCompression(boolean enable)44 public final void setMessageCompression(boolean enable) { 45 framer().setMessageCompression(enable); 46 } 47 48 @Override writeMessage(InputStream message)49 public final void writeMessage(InputStream message) { 50 checkNotNull(message, "message"); 51 try { 52 if (!framer().isClosed()) { 53 framer().writePayload(message); 54 } 55 } finally { 56 GrpcUtil.closeQuietly(message); 57 } 58 } 59 60 @Override flush()61 public final void flush() { 62 if (!framer().isClosed()) { 63 framer().flush(); 64 } 65 } 66 67 /** 68 * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed 69 * (half closure on client; closure on server). 70 */ endOfMessages()71 protected final void endOfMessages() { 72 framer().close(); 73 } 74 75 @Override setCompressor(Compressor compressor)76 public final void setCompressor(Compressor compressor) { 77 framer().setCompressor(checkNotNull(compressor, "compressor")); 78 } 79 80 @Override isReady()81 public boolean isReady() { 82 if (framer().isClosed()) { 83 return false; 84 } 85 return transportState().isReady(); 86 } 87 88 /** 89 * Event handler to be called by the subclass when a number of bytes are being queued for sending 90 * to the remote endpoint. 91 * 92 * @param numBytes the number of bytes being sent. 93 */ onSendingBytes(int numBytes)94 protected final void onSendingBytes(int numBytes) { 95 transportState().onSendingBytes(numBytes); 96 } 97 98 /** 99 * Stream state as used by the transport. This should only called from the transport thread 100 * (except for private interactions with {@code AbstractStream}). 101 */ 102 public abstract static class TransportState 103 implements ApplicationThreadDeframer.TransportExecutor, MessageDeframer.Listener { 104 /** 105 * The default number of queued bytes for a given stream, below which 106 * {@link StreamListener#onReady()} will be called. 107 */ 108 @VisibleForTesting 109 public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024; 110 111 private Deframer deframer; 112 private final Object onReadyLock = new Object(); 113 private final StatsTraceContext statsTraceCtx; 114 private final TransportTracer transportTracer; 115 116 /** 117 * The number of bytes currently queued, waiting to be sent. When this falls below 118 * DEFAULT_ONREADY_THRESHOLD, {@link StreamListener#onReady()} will be called. 119 */ 120 @GuardedBy("onReadyLock") 121 private int numSentBytesQueued; 122 /** 123 * Indicates the stream has been created on the connection. This implies that the stream is no 124 * longer limited by MAX_CONCURRENT_STREAMS. 125 */ 126 @GuardedBy("onReadyLock") 127 private boolean allocated; 128 /** 129 * Indicates that the stream no longer exists for the transport. Implies that the application 130 * should be discouraged from sending, because doing so would have no effect. 131 */ 132 @GuardedBy("onReadyLock") 133 private boolean deallocated; 134 TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)135 protected TransportState( 136 int maxMessageSize, 137 StatsTraceContext statsTraceCtx, 138 TransportTracer transportTracer) { 139 this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); 140 this.transportTracer = checkNotNull(transportTracer, "transportTracer"); 141 deframer = new MessageDeframer( 142 this, 143 Codec.Identity.NONE, 144 maxMessageSize, 145 statsTraceCtx, 146 transportTracer); 147 } 148 setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor)149 protected void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) { 150 deframer.setFullStreamDecompressor(fullStreamDecompressor); 151 deframer = new ApplicationThreadDeframer(this, this, (MessageDeframer) deframer); 152 } 153 setMaxInboundMessageSize(int maxSize)154 final void setMaxInboundMessageSize(int maxSize) { 155 deframer.setMaxInboundMessageSize(maxSize); 156 } 157 158 /** 159 * Override this method to provide a stream listener. 160 */ listener()161 protected abstract StreamListener listener(); 162 163 @Override messagesAvailable(StreamListener.MessageProducer producer)164 public void messagesAvailable(StreamListener.MessageProducer producer) { 165 listener().messagesAvailable(producer); 166 } 167 168 /** 169 * Closes the deframer and frees any resources. After this method is called, additional calls 170 * will have no effect. 171 * 172 * <p>When {@code stopDelivery} is false, the deframer will wait to close until any already 173 * queued messages have been delivered. 174 * 175 * <p>The deframer will invoke {@link #deframerClosed(boolean)} upon closing. 176 * 177 * @param stopDelivery interrupt pending deliveries and close immediately 178 */ closeDeframer(boolean stopDelivery)179 protected final void closeDeframer(boolean stopDelivery) { 180 if (stopDelivery) { 181 deframer.close(); 182 } else { 183 deframer.closeWhenComplete(); 184 } 185 } 186 187 /** 188 * Called to parse a received frame and attempt delivery of any completed messages. Must be 189 * called from the transport thread. 190 */ deframe(final ReadableBuffer frame)191 protected final void deframe(final ReadableBuffer frame) { 192 try { 193 deframer.deframe(frame); 194 } catch (Throwable t) { 195 deframeFailed(t); 196 } 197 } 198 199 /** 200 * Called to request the given number of messages from the deframer. Must be called from the 201 * transport thread. 202 */ requestMessagesFromDeframer(final int numMessages)203 public final void requestMessagesFromDeframer(final int numMessages) { 204 try { 205 deframer.request(numMessages); 206 } catch (Throwable t) { 207 deframeFailed(t); 208 } 209 } 210 getStatsTraceContext()211 public final StatsTraceContext getStatsTraceContext() { 212 return statsTraceCtx; 213 } 214 setDecompressor(Decompressor decompressor)215 protected final void setDecompressor(Decompressor decompressor) { 216 deframer.setDecompressor(decompressor); 217 } 218 isReady()219 private boolean isReady() { 220 synchronized (onReadyLock) { 221 return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated; 222 } 223 } 224 225 /** 226 * Event handler to be called by the subclass when the stream's headers have passed any 227 * connection flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link 228 * StreamListener#onReady()} handler if appropriate. This must be called from the transport 229 * thread, since the listener may be called back directly. 230 */ onStreamAllocated()231 protected void onStreamAllocated() { 232 checkState(listener() != null); 233 synchronized (onReadyLock) { 234 checkState(!allocated, "Already allocated"); 235 allocated = true; 236 } 237 notifyIfReady(); 238 } 239 240 /** 241 * Notify that the stream does not exist in a usable state any longer. This causes {@link 242 * AbstractStream#isReady()} to return {@code false} from this point forward. 243 * 244 * <p>This does not generally need to be called explicitly by the transport, as it is handled 245 * implicitly by {@link AbstractClientStream} and {@link AbstractServerStream}. 246 */ onStreamDeallocated()247 protected final void onStreamDeallocated() { 248 synchronized (onReadyLock) { 249 deallocated = true; 250 } 251 } 252 253 /** 254 * Event handler to be called by the subclass when a number of bytes are being queued for 255 * sending to the remote endpoint. 256 * 257 * @param numBytes the number of bytes being sent. 258 */ onSendingBytes(int numBytes)259 private void onSendingBytes(int numBytes) { 260 synchronized (onReadyLock) { 261 numSentBytesQueued += numBytes; 262 } 263 } 264 265 /** 266 * Event handler to be called by the subclass when a number of bytes has been sent to the remote 267 * endpoint. May call back the listener's {@link StreamListener#onReady()} handler if 268 * appropriate. This must be called from the transport thread, since the listener may be called 269 * back directly. 270 * 271 * @param numBytes the number of bytes that were sent. 272 */ onSentBytes(int numBytes)273 public final void onSentBytes(int numBytes) { 274 boolean doNotify; 275 synchronized (onReadyLock) { 276 checkState(allocated, 277 "onStreamAllocated was not called, but it seems the stream is active"); 278 boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD; 279 numSentBytesQueued -= numBytes; 280 boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD; 281 doNotify = !belowThresholdBefore && belowThresholdAfter; 282 } 283 if (doNotify) { 284 notifyIfReady(); 285 } 286 } 287 288 protected TransportTracer getTransportTracer() { 289 return transportTracer; 290 } 291 292 private void notifyIfReady() { 293 boolean doNotify; 294 synchronized (onReadyLock) { 295 doNotify = isReady(); 296 } 297 if (doNotify) { 298 listener().onReady(); 299 } 300 } 301 } 302 } 303