• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23 import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24 import static java.lang.Math.max;
25 
26 import com.google.common.annotations.VisibleForTesting;
27 import com.google.common.base.Preconditions;
28 import com.google.common.io.ByteStreams;
29 import io.grpc.Attributes;
30 import io.grpc.CallOptions;
31 import io.grpc.Codec;
32 import io.grpc.Compressor;
33 import io.grpc.Deadline;
34 import io.grpc.Decompressor;
35 import io.grpc.DecompressorRegistry;
36 import io.grpc.Grpc;
37 import io.grpc.Metadata;
38 import io.grpc.Status;
39 import io.grpc.internal.ClientStreamListener.RpcProgress;
40 import java.io.InputStream;
41 import java.util.concurrent.TimeUnit;
42 import java.util.logging.Level;
43 import java.util.logging.Logger;
44 import javax.annotation.Nullable;
45 
46 /**
47  * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
48  * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
49  * from the sending application thread.
50  */
51 public abstract class AbstractClientStream extends AbstractStream
52     implements ClientStream, MessageFramer.Sink {
53 
54   private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
55 
56   /**
57    * A sink for outbound operations, separated from the stream simply to avoid name
58    * collisions/confusion. Only called from application thread.
59    */
60   protected interface Sink {
61     /**
62      * Sends the request headers to the remote end point.
63      *
64      * @param metadata the metadata to be sent
65      * @param payload the payload needs to be sent in the headers if not null. Should only be used
66      *     when sending an unary GET request
67      */
writeHeaders(Metadata metadata, @Nullable byte[] payload)68     void writeHeaders(Metadata metadata, @Nullable byte[] payload);
69 
70     /**
71      * Sends an outbound frame to the remote end point.
72      *
73      * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
74      *     endOfStream} with no data to send
75      * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
76      *     {@code true} if this is {@code true}
77      * @param flush {@code true} if more data may not be arriving soon
78      * @param numMessages the number of messages this series of frames represents, must be >= 0.
79      */
writeFrame( @ullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)80     void writeFrame(
81         @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
82 
83     /**
84      * Tears down the stream, typically in the event of a timeout. This method may be called
85      * multiple times and from any thread.
86      *
87      * <p>This is a clone of {@link ClientStream#cancel(Status)};
88      * {@link AbstractClientStream#cancel} delegates to this method.
89      */
cancel(Status status)90     void cancel(Status status);
91   }
92 
93   private final TransportTracer transportTracer;
94   private final Framer framer;
95   private boolean shouldBeCountedForInUse;
96   private boolean useGet;
97   private Metadata headers;
98   /**
99    * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
100    * cancel() being called and isReady() beginning to return false, since cancel is commonly
101    * processed asynchronously.
102    */
103   private volatile boolean cancelled;
104 
AbstractClientStream( WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, Metadata headers, CallOptions callOptions, boolean useGet)105   protected AbstractClientStream(
106       WritableBufferAllocator bufferAllocator,
107       StatsTraceContext statsTraceCtx,
108       TransportTracer transportTracer,
109       Metadata headers,
110       CallOptions callOptions,
111       boolean useGet) {
112     checkNotNull(headers, "headers");
113     this.transportTracer = checkNotNull(transportTracer, "transportTracer");
114     this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
115     this.useGet = useGet;
116     if (!useGet) {
117       framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
118       this.headers = headers;
119     } else {
120       framer = new GetFramer(headers, statsTraceCtx);
121     }
122   }
123 
124   @Override
setDeadline(Deadline deadline)125   public void setDeadline(Deadline deadline) {
126     headers.discardAll(TIMEOUT_KEY);
127     long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
128     headers.put(TIMEOUT_KEY, effectiveTimeout);
129   }
130 
131   @Override
setMaxOutboundMessageSize(int maxSize)132   public void setMaxOutboundMessageSize(int maxSize) {
133     framer.setMaxOutboundMessageSize(maxSize);
134   }
135 
136   @Override
setMaxInboundMessageSize(int maxSize)137   public void setMaxInboundMessageSize(int maxSize) {
138     transportState().setMaxInboundMessageSize(maxSize);
139   }
140 
141   @Override
setFullStreamDecompression(boolean fullStreamDecompression)142   public final void setFullStreamDecompression(boolean fullStreamDecompression) {
143     transportState().setFullStreamDecompression(fullStreamDecompression);
144   }
145 
146   @Override
setDecompressorRegistry(DecompressorRegistry decompressorRegistry)147   public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
148     transportState().setDecompressorRegistry(decompressorRegistry);
149   }
150 
151   /** {@inheritDoc} */
152   @Override
transportState()153   protected abstract TransportState transportState();
154 
155   @Override
start(ClientStreamListener listener)156   public final void start(ClientStreamListener listener) {
157     transportState().setListener(listener);
158     if (!useGet) {
159       abstractClientStreamSink().writeHeaders(headers, null);
160       headers = null;
161     }
162   }
163 
164   /**
165    * Sink for transport to be called to perform outbound operations. Each stream must have its own
166    * unique sink.
167    */
abstractClientStreamSink()168   protected abstract Sink abstractClientStreamSink();
169 
170   @Override
framer()171   protected final Framer framer() {
172     return framer;
173   }
174 
175   /**
176    * Returns true if this stream should be counted when determining the in-use state of the
177    * transport.
178    */
shouldBeCountedForInUse()179   public final boolean shouldBeCountedForInUse() {
180     return shouldBeCountedForInUse;
181   }
182 
183   @Override
deliverFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)184   public final void deliverFrame(
185       WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
186     Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
187     abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
188   }
189 
190   @Override
halfClose()191   public final void halfClose() {
192     if (!transportState().isOutboundClosed()) {
193       transportState().setOutboundClosed();
194       endOfMessages();
195     }
196   }
197 
198   @Override
cancel(Status reason)199   public final void cancel(Status reason) {
200     Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
201     cancelled = true;
202     abstractClientStreamSink().cancel(reason);
203   }
204 
205   @Override
isReady()206   public final boolean isReady() {
207     return super.isReady() && !cancelled;
208   }
209 
210   @Override
appendTimeoutInsight(InsightBuilder insight)211   public final void appendTimeoutInsight(InsightBuilder insight) {
212     Attributes attrs = getAttributes();
213     insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
214   }
215 
getTransportTracer()216   protected TransportTracer getTransportTracer() {
217     return transportTracer;
218   }
219 
220   /** This should only be called from the transport thread. */
221   protected abstract static class TransportState extends AbstractStream.TransportState {
222     /** Whether listener.closed() has been called. */
223     private final StatsTraceContext statsTraceCtx;
224     private boolean listenerClosed;
225     private ClientStreamListener listener;
226     private boolean fullStreamDecompression;
227     private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
228 
229     private boolean deframerClosed = false;
230     private Runnable deframerClosedTask;
231 
232     /** Whether the client has half-closed the stream. */
233     private volatile boolean outboundClosed;
234 
235     /**
236      * Whether the stream is closed from the transport's perspective. This can differ from {@link
237      * #listenerClosed} because there may still be messages buffered to deliver to the application.
238      */
239     private boolean statusReported;
240     /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
241     private boolean statusReportedIsOk;
242 
TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)243     protected TransportState(
244         int maxMessageSize,
245         StatsTraceContext statsTraceCtx,
246         TransportTracer transportTracer) {
247       super(maxMessageSize, statsTraceCtx, transportTracer);
248       this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
249     }
250 
setFullStreamDecompression(boolean fullStreamDecompression)251     private void setFullStreamDecompression(boolean fullStreamDecompression) {
252       this.fullStreamDecompression = fullStreamDecompression;
253     }
254 
setDecompressorRegistry(DecompressorRegistry decompressorRegistry)255     private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
256       checkState(this.listener == null, "Already called start");
257       this.decompressorRegistry =
258           checkNotNull(decompressorRegistry, "decompressorRegistry");
259     }
260 
261     @VisibleForTesting
setListener(ClientStreamListener listener)262     public final void setListener(ClientStreamListener listener) {
263       checkState(this.listener == null, "Already called setListener");
264       this.listener = checkNotNull(listener, "listener");
265     }
266 
267     @Override
deframerClosed(boolean hasPartialMessage)268     public void deframerClosed(boolean hasPartialMessage) {
269       checkState(statusReported, "status should have been reported on deframer closed");
270       deframerClosed = true;
271       if (statusReportedIsOk && hasPartialMessage) {
272         transportReportStatus(
273             Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
274             true,
275             new Metadata());
276       }
277       if (deframerClosedTask != null) {
278         deframerClosedTask.run();
279         deframerClosedTask = null;
280       }
281     }
282 
283     @Override
listener()284     protected final ClientStreamListener listener() {
285       return listener;
286     }
287 
setOutboundClosed()288     private final void setOutboundClosed() {
289       outboundClosed = true;
290     }
291 
isOutboundClosed()292     protected final boolean isOutboundClosed() {
293       return outboundClosed;
294     }
295 
296     /**
297      * Called by transport implementations when they receive headers.
298      *
299      * @param headers the parsed headers
300      */
inboundHeadersReceived(Metadata headers)301     protected void inboundHeadersReceived(Metadata headers) {
302       checkState(!statusReported, "Received headers on closed stream");
303       statsTraceCtx.clientInboundHeaders();
304 
305       boolean compressedStream = false;
306       String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
307       if (fullStreamDecompression && streamEncoding != null) {
308         if (streamEncoding.equalsIgnoreCase("gzip")) {
309           setFullStreamDecompressor(new GzipInflatingBuffer());
310           compressedStream = true;
311         } else if (!streamEncoding.equalsIgnoreCase("identity")) {
312           deframeFailed(
313               Status.INTERNAL
314                   .withDescription(
315                       String.format("Can't find full stream decompressor for %s", streamEncoding))
316                   .asRuntimeException());
317           return;
318         }
319       }
320 
321       String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
322       if (messageEncoding != null) {
323         Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
324         if (decompressor == null) {
325           deframeFailed(
326               Status.INTERNAL
327                   .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
328                   .asRuntimeException());
329           return;
330         } else if (decompressor != Codec.Identity.NONE) {
331           if (compressedStream) {
332             deframeFailed(
333                 Status.INTERNAL
334                     .withDescription("Full stream and gRPC message encoding cannot both be set")
335                     .asRuntimeException());
336             return;
337           }
338           setDecompressor(decompressor);
339         }
340       }
341 
342       listener().headersRead(headers);
343     }
344 
345     /**
346      * Processes the contents of a received data frame from the server.
347      *
348      * @param frame the received data frame. Its ownership is transferred to this method.
349      */
inboundDataReceived(ReadableBuffer frame)350     protected void inboundDataReceived(ReadableBuffer frame) {
351       checkNotNull(frame, "frame");
352       boolean needToCloseFrame = true;
353       try {
354         if (statusReported) {
355           log.log(Level.INFO, "Received data on closed stream");
356           return;
357         }
358 
359         needToCloseFrame = false;
360         deframe(frame);
361       } finally {
362         if (needToCloseFrame) {
363           frame.close();
364         }
365       }
366     }
367 
368     /**
369      * Processes the trailers and status from the server.
370      *
371      * @param trailers the received trailers
372      * @param status the status extracted from the trailers
373      */
inboundTrailersReceived(Metadata trailers, Status status)374     protected void inboundTrailersReceived(Metadata trailers, Status status) {
375       checkNotNull(status, "status");
376       checkNotNull(trailers, "trailers");
377       if (statusReported) {
378         log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
379             new Object[]{status, trailers});
380         return;
381       }
382       statsTraceCtx.clientInboundTrailers(trailers);
383       transportReportStatus(status, false, trailers);
384     }
385 
386     /**
387      * Report stream closure with status to the application layer if not already reported. This
388      * method must be called from the transport thread.
389      *
390      * @param status the new status to set
391      * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
392      *        may already be queued up in the deframer. If {@code false}, the listener will be
393      *        notified immediately after all currently completed messages in the deframer have been
394      *        delivered to the application.
395      * @param trailers new instance of {@code Trailers}, either empty or those returned by the
396      *        server
397      */
transportReportStatus(final Status status, boolean stopDelivery, final Metadata trailers)398     public final void transportReportStatus(final Status status, boolean stopDelivery,
399         final Metadata trailers) {
400       transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
401     }
402 
403     /**
404      * Report stream closure with status to the application layer if not already reported. This
405      * method must be called from the transport thread.
406      *
407      * @param status the new status to set
408      * @param rpcProgress RPC progress that the
409      *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
410      *        will receive
411      * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
412      *        may already be queued up in the deframer and overrides any previously queued status.
413      *        If {@code false}, the listener will be notified immediately after all currently
414      *        completed messages in the deframer have been delivered to the application.
415      * @param trailers new instance of {@code Trailers}, either empty or those returned by the
416      *        server
417      */
transportReportStatus( final Status status, final RpcProgress rpcProgress, boolean stopDelivery, final Metadata trailers)418     public final void transportReportStatus(
419         final Status status,
420         final RpcProgress rpcProgress,
421         boolean stopDelivery,
422         final Metadata trailers) {
423       checkNotNull(status, "status");
424       checkNotNull(trailers, "trailers");
425       // If stopDelivery, we continue in case previous invocation is waiting for stall
426       if (statusReported && !stopDelivery) {
427         return;
428       }
429       statusReported = true;
430       statusReportedIsOk = status.isOk();
431       onStreamDeallocated();
432 
433       if (deframerClosed) {
434         deframerClosedTask = null;
435         closeListener(status, rpcProgress, trailers);
436       } else {
437         deframerClosedTask =
438             new Runnable() {
439               @Override
440               public void run() {
441                 closeListener(status, rpcProgress, trailers);
442               }
443             };
444         closeDeframer(stopDelivery);
445       }
446     }
447 
448     /**
449      * Closes the listener if not previously closed.
450      *
451      * @throws IllegalStateException if the call has not yet been started.
452      */
closeListener( Status status, RpcProgress rpcProgress, Metadata trailers)453     private void closeListener(
454         Status status, RpcProgress rpcProgress, Metadata trailers) {
455       if (!listenerClosed) {
456         listenerClosed = true;
457         statsTraceCtx.streamClosed(status);
458         listener().closed(status, rpcProgress, trailers);
459         if (getTransportTracer() != null) {
460           getTransportTracer().reportStreamClosed(status.isOk());
461         }
462       }
463     }
464   }
465 
466   private class GetFramer implements Framer {
467     private Metadata headers;
468     private boolean closed;
469     private final StatsTraceContext statsTraceCtx;
470     private byte[] payload;
471 
GetFramer(Metadata headers, StatsTraceContext statsTraceCtx)472     public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
473       this.headers = checkNotNull(headers, "headers");
474       this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
475     }
476 
477     @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
478     @Override
writePayload(InputStream message)479     public void writePayload(InputStream message) {
480       checkState(payload == null, "writePayload should not be called multiple times");
481       try {
482         payload = ByteStreams.toByteArray(message);
483       } catch (java.io.IOException ex) {
484         throw new RuntimeException(ex);
485       }
486       statsTraceCtx.outboundMessage(0);
487       statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
488       statsTraceCtx.outboundUncompressedSize(payload.length);
489       // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
490       // it using e.g., base64.  However, we are not supposed to know such detail here.
491       //
492       // We don't want to move this line to where the encoding happens either, because we'd better
493       // contain the message stats reporting in Framer as suggested in StatsTraceContext.
494       // Scattering the reporting sites increases the risk of mis-counting or double-counting.
495       //
496       // Because the payload is usually very small, people shouldn't care about the size difference
497       // caused by encoding.
498       statsTraceCtx.outboundWireSize(payload.length);
499     }
500 
501     @Override
flush()502     public void flush() {}
503 
504     @Override
isClosed()505     public boolean isClosed() {
506       return closed;
507     }
508 
509     /** Closes, with flush. */
510     @Override
close()511     public void close() {
512       closed = true;
513       checkState(payload != null,
514           "Lack of request message. GET request is only supported for unary requests");
515       abstractClientStreamSink().writeHeaders(headers, payload);
516       payload = null;
517       headers = null;
518     }
519 
520     /** Closes, without flush. */
521     @Override
dispose()522     public void dispose() {
523       closed = true;
524       payload = null;
525       headers = null;
526     }
527 
528     // Compression is not supported for GET encoding.
529     @Override
setMessageCompression(boolean enable)530     public Framer setMessageCompression(boolean enable) {
531       return this;
532     }
533 
534     @Override
setCompressor(Compressor compressor)535     public Framer setCompressor(Compressor compressor) {
536       return this;
537     }
538 
539     // TODO(zsurocking): support this
540     @Override
setMaxOutboundMessageSize(int maxSize)541     public void setMaxOutboundMessageSize(int maxSize) {}
542   }
543 }
544