• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23 import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24 import static java.lang.Math.max;
25 
26 import com.google.common.annotations.VisibleForTesting;
27 import com.google.common.base.Preconditions;
28 import io.grpc.Codec;
29 import io.grpc.Compressor;
30 import io.grpc.Deadline;
31 import io.grpc.Decompressor;
32 import io.grpc.DecompressorRegistry;
33 import io.grpc.Metadata;
34 import io.grpc.Status;
35 import io.grpc.internal.ClientStreamListener.RpcProgress;
36 import java.io.InputStream;
37 import java.util.concurrent.TimeUnit;
38 import java.util.logging.Level;
39 import java.util.logging.Logger;
40 import javax.annotation.Nullable;
41 
42 /**
43  * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
44  * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
45  * from the sending application thread.
46  */
47 public abstract class AbstractClientStream extends AbstractStream
48     implements ClientStream, MessageFramer.Sink {
49 
50   private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
51 
52   /**
53    * A sink for outbound operations, separated from the stream simply to avoid name
54    * collisions/confusion. Only called from application thread.
55    */
56   protected interface Sink {
57     /**
58      * Sends the request headers to the remote end point.
59      *
60      * @param metadata the metadata to be sent
61      * @param payload the payload needs to be sent in the headers if not null. Should only be used
62      *     when sending an unary GET request
63      */
writeHeaders(Metadata metadata, @Nullable byte[] payload)64     void writeHeaders(Metadata metadata, @Nullable byte[] payload);
65 
66     /**
67      * Sends an outbound frame to the remote end point.
68      *
69      * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
70      *     endOfStream} with no data to send
71      * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
72      *     {@code true} if this is {@code true}
73      * @param flush {@code true} if more data may not be arriving soon
74      * @Param numMessages the number of messages this series of frames represents, must be >= 0.
75      */
writeFrame( @ullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)76     void writeFrame(
77         @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
78 
79     /**
80      * Requests up to the given number of messages from the call to be delivered to the client. This
81      * should end up triggering {@link TransportState#requestMessagesFromDeframer(int)} on the
82      * transport thread.
83      */
request(int numMessages)84     void request(int numMessages);
85 
86     /**
87      * Tears down the stream, typically in the event of a timeout. This method may be called
88      * multiple times and from any thread.
89      *
90      * <p>This is a clone of {@link ClientStream#cancel(Status)};
91      * {@link AbstractClientStream#cancel} delegates to this method.
92      */
cancel(Status status)93     void cancel(Status status);
94   }
95 
96   private final TransportTracer transportTracer;
97   private final Framer framer;
98   private boolean useGet;
99   private Metadata headers;
100   /**
101    * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
102    * cancel() being called and isReady() beginning to return false, since cancel is commonly
103    * processed asynchronously.
104    */
105   private volatile boolean cancelled;
106 
AbstractClientStream( WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, Metadata headers, boolean useGet)107   protected AbstractClientStream(
108       WritableBufferAllocator bufferAllocator,
109       StatsTraceContext statsTraceCtx,
110       TransportTracer transportTracer,
111       Metadata headers,
112       boolean useGet) {
113     checkNotNull(headers, "headers");
114     this.transportTracer = checkNotNull(transportTracer, "transportTracer");
115     this.useGet = useGet;
116     if (!useGet) {
117       framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
118       this.headers = headers;
119     } else {
120       framer = new GetFramer(headers, statsTraceCtx);
121     }
122   }
123 
124   @Override
setDeadline(Deadline deadline)125   public void setDeadline(Deadline deadline) {
126     headers.discardAll(TIMEOUT_KEY);
127     long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
128     headers.put(TIMEOUT_KEY, effectiveTimeout);
129   }
130 
131   @Override
setMaxOutboundMessageSize(int maxSize)132   public void setMaxOutboundMessageSize(int maxSize) {
133     framer.setMaxOutboundMessageSize(maxSize);
134   }
135 
136   @Override
setMaxInboundMessageSize(int maxSize)137   public void setMaxInboundMessageSize(int maxSize) {
138     transportState().setMaxInboundMessageSize(maxSize);
139   }
140 
141   @Override
setFullStreamDecompression(boolean fullStreamDecompression)142   public final void setFullStreamDecompression(boolean fullStreamDecompression) {
143     transportState().setFullStreamDecompression(fullStreamDecompression);
144   }
145 
146   @Override
setDecompressorRegistry(DecompressorRegistry decompressorRegistry)147   public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
148     transportState().setDecompressorRegistry(decompressorRegistry);
149   }
150 
151   /** {@inheritDoc} */
152   @Override
transportState()153   protected abstract TransportState transportState();
154 
155   @Override
start(ClientStreamListener listener)156   public final void start(ClientStreamListener listener) {
157     transportState().setListener(listener);
158     if (!useGet) {
159       abstractClientStreamSink().writeHeaders(headers, null);
160       headers = null;
161     }
162   }
163 
164   /**
165    * Sink for transport to be called to perform outbound operations. Each stream must have its own
166    * unique sink.
167    */
abstractClientStreamSink()168   protected abstract Sink abstractClientStreamSink();
169 
170   @Override
framer()171   protected final Framer framer() {
172     return framer;
173   }
174 
175   @Override
request(int numMessages)176   public final void request(int numMessages) {
177     abstractClientStreamSink().request(numMessages);
178   }
179 
180   @Override
deliverFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)181   public final void deliverFrame(
182       WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
183     Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
184     abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
185   }
186 
187   @Override
halfClose()188   public final void halfClose() {
189     if (!transportState().isOutboundClosed()) {
190       transportState().setOutboundClosed();
191       endOfMessages();
192     }
193   }
194 
195   @Override
cancel(Status reason)196   public final void cancel(Status reason) {
197     Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
198     cancelled = true;
199     abstractClientStreamSink().cancel(reason);
200   }
201 
202   @Override
isReady()203   public final boolean isReady() {
204     return super.isReady() && !cancelled;
205   }
206 
getTransportTracer()207   protected TransportTracer getTransportTracer() {
208     return transportTracer;
209   }
210 
211   /** This should only called from the transport thread. */
212   protected abstract static class TransportState extends AbstractStream.TransportState {
213     /** Whether listener.closed() has been called. */
214     private final StatsTraceContext statsTraceCtx;
215     private boolean listenerClosed;
216     private ClientStreamListener listener;
217     private boolean fullStreamDecompression;
218     private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
219 
220     private boolean deframerClosed = false;
221     private Runnable deframerClosedTask;
222 
223     /** Whether the client has half-closed the stream. */
224     private volatile boolean outboundClosed;
225 
226     /**
227      * Whether the stream is closed from the transport's perspective. This can differ from {@link
228      * #listenerClosed} because there may still be messages buffered to deliver to the application.
229      */
230     private boolean statusReported;
231     private Metadata trailers;
232     private Status trailerStatus;
233 
TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)234     protected TransportState(
235         int maxMessageSize,
236         StatsTraceContext statsTraceCtx,
237         TransportTracer transportTracer) {
238       super(maxMessageSize, statsTraceCtx, transportTracer);
239       this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
240     }
241 
setFullStreamDecompression(boolean fullStreamDecompression)242     private void setFullStreamDecompression(boolean fullStreamDecompression) {
243       this.fullStreamDecompression = fullStreamDecompression;
244     }
245 
setDecompressorRegistry(DecompressorRegistry decompressorRegistry)246     private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
247       checkState(this.listener == null, "Already called start");
248       this.decompressorRegistry =
249           checkNotNull(decompressorRegistry, "decompressorRegistry");
250     }
251 
252     @VisibleForTesting
setListener(ClientStreamListener listener)253     public final void setListener(ClientStreamListener listener) {
254       checkState(this.listener == null, "Already called setListener");
255       this.listener = checkNotNull(listener, "listener");
256     }
257 
258     @Override
deframerClosed(boolean hasPartialMessage)259     public void deframerClosed(boolean hasPartialMessage) {
260       deframerClosed = true;
261 
262       if (trailerStatus != null) {
263         if (trailerStatus.isOk() && hasPartialMessage) {
264           trailerStatus = Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame");
265           trailers = new Metadata();
266         }
267         transportReportStatus(trailerStatus, false, trailers);
268       } else {
269         checkState(statusReported, "status should have been reported on deframer closed");
270       }
271 
272       if (deframerClosedTask != null) {
273         deframerClosedTask.run();
274         deframerClosedTask = null;
275       }
276     }
277 
278     @Override
listener()279     protected final ClientStreamListener listener() {
280       return listener;
281     }
282 
setOutboundClosed()283     private final void setOutboundClosed() {
284       outboundClosed = true;
285     }
286 
isOutboundClosed()287     protected final boolean isOutboundClosed() {
288       return outboundClosed;
289     }
290 
291     /**
292      * Called by transport implementations when they receive headers.
293      *
294      * @param headers the parsed headers
295      */
inboundHeadersReceived(Metadata headers)296     protected void inboundHeadersReceived(Metadata headers) {
297       checkState(!statusReported, "Received headers on closed stream");
298       statsTraceCtx.clientInboundHeaders();
299 
300       boolean compressedStream = false;
301       String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
302       if (fullStreamDecompression && streamEncoding != null) {
303         if (streamEncoding.equalsIgnoreCase("gzip")) {
304           setFullStreamDecompressor(new GzipInflatingBuffer());
305           compressedStream = true;
306         } else if (!streamEncoding.equalsIgnoreCase("identity")) {
307           deframeFailed(
308               Status.INTERNAL
309                   .withDescription(
310                       String.format("Can't find full stream decompressor for %s", streamEncoding))
311                   .asRuntimeException());
312           return;
313         }
314       }
315 
316       String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
317       if (messageEncoding != null) {
318         Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
319         if (decompressor == null) {
320           deframeFailed(
321               Status.INTERNAL
322                   .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
323                   .asRuntimeException());
324           return;
325         } else if (decompressor != Codec.Identity.NONE) {
326           if (compressedStream) {
327             deframeFailed(
328                 Status.INTERNAL
329                     .withDescription(
330                         String.format("Full stream and gRPC message encoding cannot both be set"))
331                     .asRuntimeException());
332             return;
333           }
334           setDecompressor(decompressor);
335         }
336       }
337 
338       listener().headersRead(headers);
339     }
340 
341     /**
342      * Processes the contents of a received data frame from the server.
343      *
344      * @param frame the received data frame. Its ownership is transferred to this method.
345      */
inboundDataReceived(ReadableBuffer frame)346     protected void inboundDataReceived(ReadableBuffer frame) {
347       checkNotNull(frame, "frame");
348       boolean needToCloseFrame = true;
349       try {
350         if (statusReported) {
351           log.log(Level.INFO, "Received data on closed stream");
352           return;
353         }
354 
355         needToCloseFrame = false;
356         deframe(frame);
357       } finally {
358         if (needToCloseFrame) {
359           frame.close();
360         }
361       }
362     }
363 
364     /**
365      * Processes the trailers and status from the server.
366      *
367      * @param trailers the received trailers
368      * @param status the status extracted from the trailers
369      */
inboundTrailersReceived(Metadata trailers, Status status)370     protected void inboundTrailersReceived(Metadata trailers, Status status) {
371       checkNotNull(status, "status");
372       checkNotNull(trailers, "trailers");
373       if (statusReported) {
374         log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
375             new Object[]{status, trailers});
376         return;
377       }
378       this.trailers = trailers;
379       trailerStatus = status;
380       closeDeframer(false);
381     }
382 
383     /**
384      * Report stream closure with status to the application layer if not already reported. This
385      * method must be called from the transport thread.
386      *
387      * @param status the new status to set
388      * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
389      *        may already be queued up in the deframer. If {@code false}, the listener will be
390      *        notified immediately after all currently completed messages in the deframer have been
391      *        delivered to the application.
392      * @param trailers new instance of {@code Trailers}, either empty or those returned by the
393      *        server
394      */
transportReportStatus(final Status status, boolean stopDelivery, final Metadata trailers)395     public final void transportReportStatus(final Status status, boolean stopDelivery,
396         final Metadata trailers) {
397       transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
398     }
399 
400     /**
401      * Report stream closure with status to the application layer if not already reported. This
402      * method must be called from the transport thread.
403      *
404      * @param status the new status to set
405      * @param rpcProgress RPC progress that the
406      *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
407      *        will receive
408      * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
409      *        may already be queued up in the deframer. If {@code false}, the listener will be
410      *        notified immediately after all currently completed messages in the deframer have been
411      *        delivered to the application.
412      * @param trailers new instance of {@code Trailers}, either empty or those returned by the
413      *        server
414      */
transportReportStatus( final Status status, final RpcProgress rpcProgress, boolean stopDelivery, final Metadata trailers)415     public final void transportReportStatus(
416         final Status status, final RpcProgress rpcProgress, boolean stopDelivery,
417         final Metadata trailers) {
418       checkNotNull(status, "status");
419       checkNotNull(trailers, "trailers");
420       // If stopDelivery, we continue in case previous invocation is waiting for stall
421       if (statusReported && !stopDelivery) {
422         return;
423       }
424       statusReported = true;
425       onStreamDeallocated();
426 
427       if (deframerClosed) {
428         deframerClosedTask = null;
429         closeListener(status, rpcProgress, trailers);
430       } else {
431         deframerClosedTask =
432             new Runnable() {
433               @Override
434               public void run() {
435                 closeListener(status, rpcProgress, trailers);
436               }
437             };
438         closeDeframer(stopDelivery);
439       }
440     }
441 
442     /**
443      * Closes the listener if not previously closed.
444      *
445      * @throws IllegalStateException if the call has not yet been started.
446      */
closeListener( Status status, RpcProgress rpcProgress, Metadata trailers)447     private void closeListener(
448         Status status, RpcProgress rpcProgress, Metadata trailers) {
449       if (!listenerClosed) {
450         listenerClosed = true;
451         statsTraceCtx.streamClosed(status);
452         listener().closed(status, rpcProgress, trailers);
453         if (getTransportTracer() != null) {
454           getTransportTracer().reportStreamClosed(status.isOk());
455         }
456       }
457     }
458   }
459 
460   private class GetFramer implements Framer {
461     private Metadata headers;
462     private boolean closed;
463     private final StatsTraceContext statsTraceCtx;
464     private byte[] payload;
465 
GetFramer(Metadata headers, StatsTraceContext statsTraceCtx)466     public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
467       this.headers = checkNotNull(headers, "headers");
468       this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
469     }
470 
471     @Override
writePayload(InputStream message)472     public void writePayload(InputStream message) {
473       checkState(payload == null, "writePayload should not be called multiple times");
474       try {
475         payload = IoUtils.toByteArray(message);
476       } catch (java.io.IOException ex) {
477         throw new RuntimeException(ex);
478       }
479       statsTraceCtx.outboundMessage(0);
480       statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
481       statsTraceCtx.outboundUncompressedSize(payload.length);
482       // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
483       // it using e.g., base64.  However, we are not supposed to know such detail here.
484       //
485       // We don't want to move this line to where the encoding happens either, because we'd better
486       // contain the message stats reporting in Framer as suggested in StatsTraceContext.
487       // Scattering the reporting sites increases the risk of mis-counting or double-counting.
488       //
489       // Because the payload is usually very small, people shouldn't care about the size difference
490       // caused by encoding.
491       statsTraceCtx.outboundWireSize(payload.length);
492     }
493 
494     @Override
flush()495     public void flush() {}
496 
497     @Override
isClosed()498     public boolean isClosed() {
499       return closed;
500     }
501 
502     /** Closes, with flush. */
503     @Override
close()504     public void close() {
505       closed = true;
506       checkState(payload != null,
507           "Lack of request message. GET request is only supported for unary requests");
508       abstractClientStreamSink().writeHeaders(headers, payload);
509       payload = null;
510       headers = null;
511     }
512 
513     /** Closes, without flush. */
514     @Override
dispose()515     public void dispose() {
516       closed = true;
517       payload = null;
518       headers = null;
519     }
520 
521     // Compression is not supported for GET encoding.
522     @Override
setMessageCompression(boolean enable)523     public Framer setMessageCompression(boolean enable) {
524       return this;
525     }
526 
527     @Override
setCompressor(Compressor compressor)528     public Framer setCompressor(Compressor compressor) {
529       return this;
530     }
531 
532     // TODO(zsurocking): support this
533     @Override
setMaxOutboundMessageSize(int maxSize)534     public void setMaxOutboundMessageSize(int maxSize) {}
535   }
536 }
537