• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import io.grpc.Codec;
24 import io.grpc.Compressor;
25 import io.grpc.Decompressor;
26 import java.io.InputStream;
27 import javax.annotation.concurrent.GuardedBy;
28 
29 /**
30  * The stream and stream state as used by the application. Must only be called from the sending
31  * application thread.
32  */
33 public abstract class AbstractStream implements Stream {
34   /** The framer to use for sending messages. */
framer()35   protected abstract Framer framer();
36 
37   /**
38    * Obtain the transport state corresponding to this stream. Each stream must have its own unique
39    * transport state.
40    */
transportState()41   protected abstract TransportState transportState();
42 
43   @Override
setMessageCompression(boolean enable)44   public final void setMessageCompression(boolean enable) {
45     framer().setMessageCompression(enable);
46   }
47 
48   @Override
writeMessage(InputStream message)49   public final void writeMessage(InputStream message) {
50     checkNotNull(message, "message");
51     try {
52       if (!framer().isClosed()) {
53         framer().writePayload(message);
54       }
55     } finally {
56       GrpcUtil.closeQuietly(message);
57     }
58   }
59 
60   @Override
flush()61   public final void flush() {
62     if (!framer().isClosed()) {
63       framer().flush();
64     }
65   }
66 
67   /**
68    * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed
69    * (half closure on client; closure on server).
70    */
endOfMessages()71   protected final void endOfMessages() {
72     framer().close();
73   }
74 
75   @Override
setCompressor(Compressor compressor)76   public final void setCompressor(Compressor compressor) {
77     framer().setCompressor(checkNotNull(compressor, "compressor"));
78   }
79 
80   @Override
isReady()81   public boolean isReady() {
82     if (framer().isClosed()) {
83       return false;
84     }
85     return transportState().isReady();
86   }
87 
88   /**
89    * Event handler to be called by the subclass when a number of bytes are being queued for sending
90    * to the remote endpoint.
91    *
92    * @param numBytes the number of bytes being sent.
93    */
onSendingBytes(int numBytes)94   protected final void onSendingBytes(int numBytes) {
95     transportState().onSendingBytes(numBytes);
96   }
97 
98   /**
99    * Stream state as used by the transport. This should only called from the transport thread
100    * (except for private interactions with {@code AbstractStream}).
101    */
102   public abstract static class TransportState
103       implements ApplicationThreadDeframer.TransportExecutor, MessageDeframer.Listener {
104     /**
105      * The default number of queued bytes for a given stream, below which
106      * {@link StreamListener#onReady()} will be called.
107      */
108     @VisibleForTesting
109     public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024;
110 
111     private Deframer deframer;
112     private final Object onReadyLock = new Object();
113     private final StatsTraceContext statsTraceCtx;
114     private final TransportTracer transportTracer;
115 
116     /**
117      * The number of bytes currently queued, waiting to be sent. When this falls below
118      * DEFAULT_ONREADY_THRESHOLD, {@link StreamListener#onReady()} will be called.
119      */
120     @GuardedBy("onReadyLock")
121     private int numSentBytesQueued;
122     /**
123      * Indicates the stream has been created on the connection. This implies that the stream is no
124      * longer limited by MAX_CONCURRENT_STREAMS.
125      */
126     @GuardedBy("onReadyLock")
127     private boolean allocated;
128     /**
129      * Indicates that the stream no longer exists for the transport. Implies that the application
130      * should be discouraged from sending, because doing so would have no effect.
131      */
132     @GuardedBy("onReadyLock")
133     private boolean deallocated;
134 
TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)135     protected TransportState(
136         int maxMessageSize,
137         StatsTraceContext statsTraceCtx,
138         TransportTracer transportTracer) {
139       this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
140       this.transportTracer = checkNotNull(transportTracer, "transportTracer");
141       deframer = new MessageDeframer(
142           this,
143           Codec.Identity.NONE,
144           maxMessageSize,
145           statsTraceCtx,
146           transportTracer);
147     }
148 
setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor)149     protected void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
150       deframer.setFullStreamDecompressor(fullStreamDecompressor);
151       deframer = new ApplicationThreadDeframer(this, this, (MessageDeframer) deframer);
152     }
153 
setMaxInboundMessageSize(int maxSize)154     final void setMaxInboundMessageSize(int maxSize) {
155       deframer.setMaxInboundMessageSize(maxSize);
156     }
157 
158     /**
159      * Override this method to provide a stream listener.
160      */
listener()161     protected abstract StreamListener listener();
162 
163     @Override
messagesAvailable(StreamListener.MessageProducer producer)164     public void messagesAvailable(StreamListener.MessageProducer producer) {
165       listener().messagesAvailable(producer);
166     }
167 
168     /**
169      * Closes the deframer and frees any resources. After this method is called, additional calls
170      * will have no effect.
171      *
172      * <p>When {@code stopDelivery} is false, the deframer will wait to close until any already
173      * queued messages have been delivered.
174      *
175      * <p>The deframer will invoke {@link #deframerClosed(boolean)} upon closing.
176      *
177      * @param stopDelivery interrupt pending deliveries and close immediately
178      */
closeDeframer(boolean stopDelivery)179     protected final void closeDeframer(boolean stopDelivery) {
180       if (stopDelivery) {
181         deframer.close();
182       } else {
183         deframer.closeWhenComplete();
184       }
185     }
186 
187     /**
188      * Called to parse a received frame and attempt delivery of any completed messages. Must be
189      * called from the transport thread.
190      */
deframe(final ReadableBuffer frame)191     protected final void deframe(final ReadableBuffer frame) {
192       try {
193         deframer.deframe(frame);
194       } catch (Throwable t) {
195         deframeFailed(t);
196       }
197     }
198 
199     /**
200      * Called to request the given number of messages from the deframer. Must be called from the
201      * transport thread.
202      */
requestMessagesFromDeframer(final int numMessages)203     public final void requestMessagesFromDeframer(final int numMessages) {
204       try {
205         deframer.request(numMessages);
206       } catch (Throwable t) {
207         deframeFailed(t);
208       }
209     }
210 
getStatsTraceContext()211     public final StatsTraceContext getStatsTraceContext() {
212       return statsTraceCtx;
213     }
214 
setDecompressor(Decompressor decompressor)215     protected final void setDecompressor(Decompressor decompressor) {
216       deframer.setDecompressor(decompressor);
217     }
218 
isReady()219     private boolean isReady() {
220       synchronized (onReadyLock) {
221         return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated;
222       }
223     }
224 
225     /**
226      * Event handler to be called by the subclass when the stream's headers have passed any
227      * connection flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link
228      * StreamListener#onReady()} handler if appropriate. This must be called from the transport
229      * thread, since the listener may be called back directly.
230      */
onStreamAllocated()231     protected void onStreamAllocated() {
232       checkState(listener() != null);
233       synchronized (onReadyLock) {
234         checkState(!allocated, "Already allocated");
235         allocated = true;
236       }
237       notifyIfReady();
238     }
239 
240     /**
241      * Notify that the stream does not exist in a usable state any longer. This causes {@link
242      * AbstractStream#isReady()} to return {@code false} from this point forward.
243      *
244      * <p>This does not generally need to be called explicitly by the transport, as it is handled
245      * implicitly by {@link AbstractClientStream} and {@link AbstractServerStream}.
246      */
onStreamDeallocated()247     protected final void onStreamDeallocated() {
248       synchronized (onReadyLock) {
249         deallocated = true;
250       }
251     }
252 
253     /**
254      * Event handler to be called by the subclass when a number of bytes are being queued for
255      * sending to the remote endpoint.
256      *
257      * @param numBytes the number of bytes being sent.
258      */
onSendingBytes(int numBytes)259     private void onSendingBytes(int numBytes) {
260       synchronized (onReadyLock) {
261         numSentBytesQueued += numBytes;
262       }
263     }
264 
265     /**
266      * Event handler to be called by the subclass when a number of bytes has been sent to the remote
267      * endpoint. May call back the listener's {@link StreamListener#onReady()} handler if
268      * appropriate.  This must be called from the transport thread, since the listener may be called
269      * back directly.
270      *
271      * @param numBytes the number of bytes that were sent.
272      */
onSentBytes(int numBytes)273     public final void onSentBytes(int numBytes) {
274       boolean doNotify;
275       synchronized (onReadyLock) {
276         checkState(allocated,
277             "onStreamAllocated was not called, but it seems the stream is active");
278         boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
279         numSentBytesQueued -= numBytes;
280         boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
281         doNotify = !belowThresholdBefore && belowThresholdAfter;
282       }
283       if (doNotify) {
284         notifyIfReady();
285       }
286     }
287 
288     protected TransportTracer getTransportTracer() {
289       return transportTracer;
290     }
291 
292     private void notifyIfReady() {
293       boolean doNotify;
294       synchronized (onReadyLock) {
295         doNotify = isReady();
296       }
297       if (doNotify) {
298         listener().onReady();
299       }
300     }
301   }
302 }
303