• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import io.grpc.Codec;
24 import io.grpc.Compressor;
25 import io.grpc.Decompressor;
26 import io.perfmark.Link;
27 import io.perfmark.PerfMark;
28 import io.perfmark.TaskCloseable;
29 import java.io.InputStream;
30 import javax.annotation.concurrent.GuardedBy;
31 
32 /**
33  * The stream and stream state as used by the application. Must only be called from the sending
34  * application thread.
35  */
36 public abstract class AbstractStream implements Stream {
37   /** The framer to use for sending messages. */
framer()38   protected abstract Framer framer();
39 
40   /**
41    * Obtain the transport state corresponding to this stream. Each stream must have its own unique
42    * transport state.
43    */
transportState()44   protected abstract TransportState transportState();
45 
46   @Override
optimizeForDirectExecutor()47   public void optimizeForDirectExecutor() {
48     transportState().optimizeForDirectExecutor();
49   }
50 
51   @Override
setMessageCompression(boolean enable)52   public final void setMessageCompression(boolean enable) {
53     framer().setMessageCompression(enable);
54   }
55 
56   @Override
request(int numMessages)57   public final void request(int numMessages) {
58     transportState().requestMessagesFromDeframer(numMessages);
59   }
60 
61   @Override
writeMessage(InputStream message)62   public final void writeMessage(InputStream message) {
63     checkNotNull(message, "message");
64     try {
65       if (!framer().isClosed()) {
66         framer().writePayload(message);
67       }
68     } finally {
69       GrpcUtil.closeQuietly(message);
70     }
71   }
72 
73   @Override
flush()74   public final void flush() {
75     if (!framer().isClosed()) {
76       framer().flush();
77     }
78   }
79 
80   /**
81    * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed
82    * (half closure on client; closure on server).
83    */
endOfMessages()84   protected final void endOfMessages() {
85     framer().close();
86   }
87 
88   @Override
setCompressor(Compressor compressor)89   public final void setCompressor(Compressor compressor) {
90     framer().setCompressor(checkNotNull(compressor, "compressor"));
91   }
92 
93   @Override
isReady()94   public boolean isReady() {
95     return transportState().isReady();
96   }
97 
98   /**
99    * Event handler to be called by the subclass when a number of bytes are being queued for sending
100    * to the remote endpoint.
101    *
102    * @param numBytes the number of bytes being sent.
103    */
onSendingBytes(int numBytes)104   protected final void onSendingBytes(int numBytes) {
105     transportState().onSendingBytes(numBytes);
106   }
107 
108   /**
109    * Stream state as used by the transport. This should only be called from the transport thread
110    * (except for private interactions with {@code AbstractStream}).
111    */
112   public abstract static class TransportState
113       implements ApplicationThreadDeframer.TransportExecutor, MessageDeframer.Listener {
114     /**
115      * The default number of queued bytes for a given stream, below which
116      * {@link StreamListener#onReady()} will be called.
117      */
118     @VisibleForTesting
119     public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024;
120 
121     private Deframer deframer;
122     private final Object onReadyLock = new Object();
123     private final StatsTraceContext statsTraceCtx;
124     private final TransportTracer transportTracer;
125     private final MessageDeframer rawDeframer;
126 
127     /**
128      * The number of bytes currently queued, waiting to be sent. When this falls below
129      * DEFAULT_ONREADY_THRESHOLD, {@link StreamListener#onReady()} will be called.
130      */
131     @GuardedBy("onReadyLock")
132     private int numSentBytesQueued;
133     /**
134      * Indicates the stream has been created on the connection. This implies that the stream is no
135      * longer limited by MAX_CONCURRENT_STREAMS.
136      */
137     @GuardedBy("onReadyLock")
138     private boolean allocated;
139     /**
140      * Indicates that the stream no longer exists for the transport. Implies that the application
141      * should be discouraged from sending, because doing so would have no effect.
142      */
143     @GuardedBy("onReadyLock")
144     private boolean deallocated;
145 
TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)146     protected TransportState(
147         int maxMessageSize,
148         StatsTraceContext statsTraceCtx,
149         TransportTracer transportTracer) {
150       this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
151       this.transportTracer = checkNotNull(transportTracer, "transportTracer");
152       rawDeframer = new MessageDeframer(
153           this,
154           Codec.Identity.NONE,
155           maxMessageSize,
156           statsTraceCtx,
157           transportTracer);
158       // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
159       deframer = rawDeframer;
160     }
161 
optimizeForDirectExecutor()162     final void optimizeForDirectExecutor() {
163       rawDeframer.setListener(this);
164       deframer = rawDeframer;
165     }
166 
setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor)167     protected void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
168       rawDeframer.setFullStreamDecompressor(fullStreamDecompressor);
169       deframer = new ApplicationThreadDeframer(this, this, rawDeframer);
170     }
171 
setMaxInboundMessageSize(int maxSize)172     final void setMaxInboundMessageSize(int maxSize) {
173       deframer.setMaxInboundMessageSize(maxSize);
174     }
175 
176     /**
177      * Override this method to provide a stream listener.
178      */
listener()179     protected abstract StreamListener listener();
180 
181     @Override
messagesAvailable(StreamListener.MessageProducer producer)182     public void messagesAvailable(StreamListener.MessageProducer producer) {
183       listener().messagesAvailable(producer);
184     }
185 
186     /**
187      * Closes the deframer and frees any resources. After this method is called, additional calls
188      * will have no effect.
189      *
190      * <p>When {@code stopDelivery} is false, the deframer will wait to close until any already
191      * queued messages have been delivered.
192      *
193      * <p>The deframer will invoke {@link #deframerClosed(boolean)} upon closing.
194      *
195      * @param stopDelivery interrupt pending deliveries and close immediately
196      */
closeDeframer(boolean stopDelivery)197     protected final void closeDeframer(boolean stopDelivery) {
198       if (stopDelivery) {
199         deframer.close();
200       } else {
201         deframer.closeWhenComplete();
202       }
203     }
204 
205     /**
206      * Called to parse a received frame and attempt delivery of any completed messages. Must be
207      * called from the transport thread.
208      */
deframe(final ReadableBuffer frame)209     protected final void deframe(final ReadableBuffer frame) {
210       try {
211         deframer.deframe(frame);
212       } catch (Throwable t) {
213         deframeFailed(t);
214       }
215     }
216 
217     /**
218      * Called to request the given number of messages from the deframer. May be called from any
219      * thread.
220      */
requestMessagesFromDeframer(final int numMessages)221     private void requestMessagesFromDeframer(final int numMessages) {
222       if (deframer instanceof ThreadOptimizedDeframer) {
223         try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
224           deframer.request(numMessages);
225         }
226         return;
227       }
228       final Link link = PerfMark.linkOut();
229       class RequestRunnable implements Runnable {
230         @Override public void run() {
231           try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
232             PerfMark.linkIn(link);
233             deframer.request(numMessages);
234           } catch (Throwable t) {
235             deframeFailed(t);
236           }
237         }
238       }
239 
240       runOnTransportThread(new RequestRunnable());
241     }
242 
243     /**
244      * Very rarely used. Prefer stream.request() instead of this; this method is only necessary if
245      * a stream is not available.
246      */
247     @VisibleForTesting
requestMessagesFromDeframerForTesting(int numMessages)248     public final void requestMessagesFromDeframerForTesting(int numMessages) {
249       requestMessagesFromDeframer(numMessages);
250     }
251 
getStatsTraceContext()252     public final StatsTraceContext getStatsTraceContext() {
253       return statsTraceCtx;
254     }
255 
setDecompressor(Decompressor decompressor)256     protected final void setDecompressor(Decompressor decompressor) {
257       deframer.setDecompressor(decompressor);
258     }
259 
isReady()260     private boolean isReady() {
261       synchronized (onReadyLock) {
262         return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated;
263       }
264     }
265 
266     /**
267      * Event handler to be called by the subclass when the stream's headers have passed any
268      * connection flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link
269      * StreamListener#onReady()} handler if appropriate. This must be called from the transport
270      * thread, since the listener may be called back directly.
271      */
onStreamAllocated()272     protected void onStreamAllocated() {
273       checkState(listener() != null);
274       synchronized (onReadyLock) {
275         checkState(!allocated, "Already allocated");
276         allocated = true;
277       }
278       notifyIfReady();
279     }
280 
281     /**
282      * Notify that the stream does not exist in a usable state any longer. This causes {@link
283      * AbstractStream#isReady()} to return {@code false} from this point forward.
284      *
285      * <p>This does not generally need to be called explicitly by the transport, as it is handled
286      * implicitly by {@link AbstractClientStream} and {@link AbstractServerStream}.
287      */
onStreamDeallocated()288     protected final void onStreamDeallocated() {
289       synchronized (onReadyLock) {
290         deallocated = true;
291       }
292     }
293 
294     /**
295      * Event handler to be called by the subclass when a number of bytes are being queued for
296      * sending to the remote endpoint.
297      *
298      * @param numBytes the number of bytes being sent.
299      */
onSendingBytes(int numBytes)300     private void onSendingBytes(int numBytes) {
301       synchronized (onReadyLock) {
302         numSentBytesQueued += numBytes;
303       }
304     }
305 
306     /**
307      * Event handler to be called by the subclass when a number of bytes has been sent to the remote
308      * endpoint. May call back the listener's {@link StreamListener#onReady()} handler if
309      * appropriate.  This must be called from the transport thread, since the listener may be called
310      * back directly.
311      *
312      * @param numBytes the number of bytes that were sent.
313      */
onSentBytes(int numBytes)314     public final void onSentBytes(int numBytes) {
315       boolean doNotify;
316       synchronized (onReadyLock) {
317         checkState(allocated,
318             "onStreamAllocated was not called, but it seems the stream is active");
319         boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
320         numSentBytesQueued -= numBytes;
321         boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
322         doNotify = !belowThresholdBefore && belowThresholdAfter;
323       }
324       if (doNotify) {
325         notifyIfReady();
326       }
327     }
328 
329     protected TransportTracer getTransportTracer() {
330       return transportTracer;
331     }
332 
333     private void notifyIfReady() {
334       boolean doNotify;
335       synchronized (onReadyLock) {
336         doNotify = isReady();
337       }
338       if (doNotify) {
339         listener().onReady();
340       }
341     }
342   }
343 }
344