• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23 import static io.grpc.Contexts.statusFromCancelled;
24 import static io.grpc.Status.DEADLINE_EXCEEDED;
25 import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
26 import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
27 import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
28 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
29 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
30 import static java.lang.Math.max;
31 
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.MoreObjects;
34 import io.grpc.Attributes;
35 import io.grpc.CallOptions;
36 import io.grpc.ClientCall;
37 import io.grpc.ClientStreamTracer;
38 import io.grpc.Codec;
39 import io.grpc.Compressor;
40 import io.grpc.CompressorRegistry;
41 import io.grpc.Context;
42 import io.grpc.Context.CancellationListener;
43 import io.grpc.Deadline;
44 import io.grpc.DecompressorRegistry;
45 import io.grpc.InternalConfigSelector;
46 import io.grpc.InternalDecompressorRegistry;
47 import io.grpc.Metadata;
48 import io.grpc.MethodDescriptor;
49 import io.grpc.MethodDescriptor.MethodType;
50 import io.grpc.Status;
51 import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
52 import io.perfmark.Link;
53 import io.perfmark.PerfMark;
54 import io.perfmark.Tag;
55 import io.perfmark.TaskCloseable;
56 import java.io.InputStream;
57 import java.nio.charset.Charset;
58 import java.util.Locale;
59 import java.util.concurrent.CancellationException;
60 import java.util.concurrent.Executor;
61 import java.util.concurrent.ScheduledExecutorService;
62 import java.util.concurrent.ScheduledFuture;
63 import java.util.concurrent.TimeUnit;
64 import java.util.logging.Level;
65 import java.util.logging.Logger;
66 import javax.annotation.Nullable;
67 
68 /**
69  * Implementation of {@link ClientCall}.
70  */
71 final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
72 
73   private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
74   private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
75       = "gzip".getBytes(Charset.forName("US-ASCII"));
76   private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1);
77 
78   private final MethodDescriptor<ReqT, RespT> method;
79   private final Tag tag;
80   private final Executor callExecutor;
81   private final boolean callExecutorIsDirect;
82   private final CallTracer channelCallsTracer;
83   private final Context context;
84   private volatile ScheduledFuture<?> deadlineCancellationFuture;
85   private final boolean unaryRequest;
86   private CallOptions callOptions;
87   private ClientStream stream;
88   private volatile boolean cancelListenersShouldBeRemoved;
89   private boolean cancelCalled;
90   private boolean halfCloseCalled;
91   private final ClientStreamProvider clientStreamProvider;
92   private final ContextCancellationListener cancellationListener =
93       new ContextCancellationListener();
94   private final ScheduledExecutorService deadlineCancellationExecutor;
95   private boolean fullStreamDecompression;
96   private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
97   private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
98 
ClientCallImpl( MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions, ClientStreamProvider clientStreamProvider, ScheduledExecutorService deadlineCancellationExecutor, CallTracer channelCallsTracer, @Nullable InternalConfigSelector configSelector)99   ClientCallImpl(
100       MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
101       ClientStreamProvider clientStreamProvider,
102       ScheduledExecutorService deadlineCancellationExecutor,
103       CallTracer channelCallsTracer,
104       // TODO(zdapeng): remove this arg
105       @Nullable InternalConfigSelector configSelector) {
106     this.method = method;
107     // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
108     this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
109     // If we know that the executor is a direct executor, we don't need to wrap it with a
110     // SerializingExecutor. This is purely for performance reasons.
111     // See https://github.com/grpc/grpc-java/issues/368
112     if (executor == directExecutor()) {
113       this.callExecutor = new SerializeReentrantCallsDirectExecutor();
114       callExecutorIsDirect = true;
115     } else {
116       this.callExecutor = new SerializingExecutor(executor);
117       callExecutorIsDirect = false;
118     }
119     this.channelCallsTracer = channelCallsTracer;
120     // Propagate the context from the thread which initiated the call to all callbacks.
121     this.context = Context.current();
122     this.unaryRequest = method.getType() == MethodType.UNARY
123         || method.getType() == MethodType.SERVER_STREAMING;
124     this.callOptions = callOptions;
125     this.clientStreamProvider = clientStreamProvider;
126     this.deadlineCancellationExecutor = deadlineCancellationExecutor;
127     PerfMark.event("ClientCall.<init>", tag);
128   }
129 
130   private final class ContextCancellationListener implements CancellationListener {
131     @Override
cancelled(Context context)132     public void cancelled(Context context) {
133       stream.cancel(statusFromCancelled(context));
134     }
135   }
136 
137   /**
138    * Provider of {@link ClientStream}s.
139    */
140   interface ClientStreamProvider {
newStream( MethodDescriptor<?, ?> method, CallOptions callOptions, Metadata headers, Context context)141     ClientStream newStream(
142         MethodDescriptor<?, ?> method,
143         CallOptions callOptions,
144         Metadata headers,
145         Context context);
146   }
147 
setFullStreamDecompression(boolean fullStreamDecompression)148   ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
149     this.fullStreamDecompression = fullStreamDecompression;
150     return this;
151   }
152 
setDecompressorRegistry(DecompressorRegistry decompressorRegistry)153   ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
154     this.decompressorRegistry = decompressorRegistry;
155     return this;
156   }
157 
setCompressorRegistry(CompressorRegistry compressorRegistry)158   ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
159     this.compressorRegistry = compressorRegistry;
160     return this;
161   }
162 
163   @VisibleForTesting
prepareHeaders( Metadata headers, DecompressorRegistry decompressorRegistry, Compressor compressor, boolean fullStreamDecompression)164   static void prepareHeaders(
165       Metadata headers,
166       DecompressorRegistry decompressorRegistry,
167       Compressor compressor,
168       boolean fullStreamDecompression) {
169     headers.discardAll(CONTENT_LENGTH_KEY);
170     headers.discardAll(MESSAGE_ENCODING_KEY);
171     if (compressor != Codec.Identity.NONE) {
172       headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
173     }
174 
175     headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
176     byte[] advertisedEncodings =
177         InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
178     if (advertisedEncodings.length != 0) {
179       headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
180     }
181 
182     headers.discardAll(CONTENT_ENCODING_KEY);
183     headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
184     if (fullStreamDecompression) {
185       headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
186     }
187   }
188 
189   @Override
start(Listener<RespT> observer, Metadata headers)190   public void start(Listener<RespT> observer, Metadata headers) {
191     try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) {
192       PerfMark.attachTag(tag);
193       startInternal(observer, headers);
194     }
195   }
196 
startInternal(Listener<RespT> observer, Metadata headers)197   private void startInternal(Listener<RespT> observer, Metadata headers) {
198     checkState(stream == null, "Already started");
199     checkState(!cancelCalled, "call was cancelled");
200     checkNotNull(observer, "observer");
201     checkNotNull(headers, "headers");
202 
203     if (context.isCancelled()) {
204       // Context is already cancelled so no need to create a real stream, just notify the observer
205       // of cancellation via callback on the executor
206       stream = NoopClientStream.INSTANCE;
207       final Listener<RespT> finalObserver = observer;
208       class ClosedByContext extends ContextRunnable {
209         ClosedByContext() {
210           super(context);
211         }
212 
213         @Override
214         public void runInContext() {
215           closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
216         }
217       }
218 
219       callExecutor.execute(new ClosedByContext());
220       return;
221     }
222     applyMethodConfig();
223     final String compressorName = callOptions.getCompressor();
224     Compressor compressor;
225     if (compressorName != null) {
226       compressor = compressorRegistry.lookupCompressor(compressorName);
227       if (compressor == null) {
228         stream = NoopClientStream.INSTANCE;
229         final Listener<RespT> finalObserver = observer;
230         class ClosedByNotFoundCompressor extends ContextRunnable {
231           ClosedByNotFoundCompressor() {
232             super(context);
233           }
234 
235           @Override
236           public void runInContext() {
237             closeObserver(
238                 finalObserver,
239                 Status.INTERNAL.withDescription(
240                     String.format("Unable to find compressor by name %s", compressorName)),
241                 new Metadata());
242           }
243         }
244 
245         callExecutor.execute(new ClosedByNotFoundCompressor());
246         return;
247       }
248     } else {
249       compressor = Codec.Identity.NONE;
250     }
251     prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
252 
253     Deadline effectiveDeadline = effectiveDeadline();
254     boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
255     if (!deadlineExceeded) {
256       logIfContextNarrowedTimeout(
257           effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
258       stream = clientStreamProvider.newStream(method, callOptions, headers, context);
259     } else {
260       ClientStreamTracer[] tracers =
261           GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
262       String deadlineName =
263           isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context";
264       String description = String.format(
265           "ClientCall started after %s deadline was exceeded .9%f seconds ago", deadlineName,
266           effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS);
267       stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
268     }
269 
270     if (callExecutorIsDirect) {
271       stream.optimizeForDirectExecutor();
272     }
273     if (callOptions.getAuthority() != null) {
274       stream.setAuthority(callOptions.getAuthority());
275     }
276     if (callOptions.getMaxInboundMessageSize() != null) {
277       stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
278     }
279     if (callOptions.getMaxOutboundMessageSize() != null) {
280       stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
281     }
282     if (effectiveDeadline != null) {
283       stream.setDeadline(effectiveDeadline);
284     }
285     stream.setCompressor(compressor);
286     if (fullStreamDecompression) {
287       stream.setFullStreamDecompression(fullStreamDecompression);
288     }
289     stream.setDecompressorRegistry(decompressorRegistry);
290     channelCallsTracer.reportCallStarted();
291     stream.start(new ClientStreamListenerImpl(observer));
292 
293     // Delay any sources of cancellation after start(), because most of the transports are broken if
294     // they receive cancel before start. Issue #1343 has more details
295 
296     // Propagate later Context cancellation to the remote side.
297     context.addListener(cancellationListener, directExecutor());
298     if (effectiveDeadline != null
299         // If the context has the effective deadline, we don't need to schedule an extra task.
300         && !effectiveDeadline.equals(context.getDeadline())
301         // If the channel has been terminated, we don't need to schedule an extra task.
302         && deadlineCancellationExecutor != null) {
303       deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
304     }
305     if (cancelListenersShouldBeRemoved) {
306       // Race detected! ClientStreamListener.closed may have been called before
307       // deadlineCancellationFuture was set / context listener added, thereby preventing the future
308       // and listener from being cancelled. Go ahead and cancel again, just to be sure it
309       // was cancelled.
310       removeContextListenerAndCancelDeadlineFuture();
311     }
312   }
313 
applyMethodConfig()314   private void applyMethodConfig() {
315     MethodInfo info = callOptions.getOption(MethodInfo.KEY);
316     if (info == null) {
317       return;
318     }
319     if (info.timeoutNanos != null) {
320       Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS);
321       Deadline existingDeadline = callOptions.getDeadline();
322       // If the new deadline is sooner than the existing deadline, swap them.
323       if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) {
324         callOptions = callOptions.withDeadline(newDeadline);
325       }
326     }
327     if (info.waitForReady != null) {
328       callOptions =
329           info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady();
330     }
331     if (info.maxInboundMessageSize != null) {
332       Integer existingLimit = callOptions.getMaxInboundMessageSize();
333       if (existingLimit != null) {
334         callOptions =
335             callOptions.withMaxInboundMessageSize(
336                 Math.min(existingLimit, info.maxInboundMessageSize));
337       } else {
338         callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize);
339       }
340     }
341     if (info.maxOutboundMessageSize != null) {
342       Integer existingLimit = callOptions.getMaxOutboundMessageSize();
343       if (existingLimit != null) {
344         callOptions =
345             callOptions.withMaxOutboundMessageSize(
346                 Math.min(existingLimit, info.maxOutboundMessageSize));
347       } else {
348         callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize);
349       }
350     }
351   }
352 
logIfContextNarrowedTimeout( Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, @Nullable Deadline callDeadline)353   private static void logIfContextNarrowedTimeout(
354       Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
355       @Nullable Deadline callDeadline) {
356     if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
357         || !effectiveDeadline.equals(outerCallDeadline)) {
358       return;
359     }
360 
361     long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
362     StringBuilder builder = new StringBuilder(String.format(
363         Locale.US,
364         "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
365     if (callDeadline == null) {
366       builder.append(" Explicit call timeout was not set.");
367     } else {
368       long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
369       builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
370     }
371 
372     log.fine(builder.toString());
373   }
374 
removeContextListenerAndCancelDeadlineFuture()375   private void removeContextListenerAndCancelDeadlineFuture() {
376     context.removeListener(cancellationListener);
377     ScheduledFuture<?> f = deadlineCancellationFuture;
378     if (f != null) {
379       f.cancel(false);
380     }
381   }
382 
383   private class DeadlineTimer implements Runnable {
384     private final long remainingNanos;
385 
DeadlineTimer(long remainingNanos)386     DeadlineTimer(long remainingNanos) {
387       this.remainingNanos = remainingNanos;
388     }
389 
390     @Override
run()391     public void run() {
392       InsightBuilder insight = new InsightBuilder();
393       stream.appendTimeoutInsight(insight);
394       // DelayedStream.cancel() is safe to call from a thread that is different from where the
395       // stream is created.
396       long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
397       long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
398 
399       StringBuilder buf = new StringBuilder();
400       buf.append("deadline exceeded after ");
401       if (remainingNanos < 0) {
402         buf.append('-');
403       }
404       buf.append(seconds);
405       buf.append(String.format(Locale.US, ".%09d", nanos));
406       buf.append("s. ");
407       buf.append(insight);
408       stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
409     }
410   }
411 
startDeadlineTimer(Deadline deadline)412   private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
413     long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
414     return deadlineCancellationExecutor.schedule(
415         new LogExceptionRunnable(
416             new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
417   }
418 
419   @Nullable
effectiveDeadline()420   private Deadline effectiveDeadline() {
421     // Call options and context are immutable, so we don't need to cache the deadline.
422     return min(callOptions.getDeadline(), context.getDeadline());
423   }
424 
425   @Nullable
min(@ullable Deadline deadline0, @Nullable Deadline deadline1)426   private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
427     if (deadline0 == null) {
428       return deadline1;
429     }
430     if (deadline1 == null) {
431       return deadline0;
432     }
433     return deadline0.minimum(deadline1);
434   }
435 
isFirstMin(@ullable Deadline deadline0, @Nullable Deadline deadline1)436   private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
437     if (deadline0 == null) {
438       return false;
439     }
440     if (deadline1 == null) {
441       return true;
442     }
443     return deadline0.isBefore(deadline1);
444   }
445 
446   @Override
request(int numMessages)447   public void request(int numMessages) {
448     try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
449       PerfMark.attachTag(tag);
450       checkState(stream != null, "Not started");
451       checkArgument(numMessages >= 0, "Number requested must be non-negative");
452       stream.request(numMessages);
453     }
454   }
455 
456   @Override
cancel(@ullable String message, @Nullable Throwable cause)457   public void cancel(@Nullable String message, @Nullable Throwable cause) {
458     try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) {
459       PerfMark.attachTag(tag);
460       cancelInternal(message, cause);
461     }
462   }
463 
cancelInternal(@ullable String message, @Nullable Throwable cause)464   private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
465     if (message == null && cause == null) {
466       cause = new CancellationException("Cancelled without a message or cause");
467       log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
468     }
469     if (cancelCalled) {
470       return;
471     }
472     cancelCalled = true;
473     try {
474       // Cancel is called in exception handling cases, so it may be the case that the
475       // stream was never successfully created or start has never been called.
476       if (stream != null) {
477         Status status = Status.CANCELLED;
478         if (message != null) {
479           status = status.withDescription(message);
480         } else {
481           status = status.withDescription("Call cancelled without message");
482         }
483         if (cause != null) {
484           status = status.withCause(cause);
485         }
486         stream.cancel(status);
487       }
488     } finally {
489       removeContextListenerAndCancelDeadlineFuture();
490     }
491   }
492 
493   @Override
halfClose()494   public void halfClose() {
495     try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) {
496       PerfMark.attachTag(tag);
497       halfCloseInternal();
498     }
499   }
500 
halfCloseInternal()501   private void halfCloseInternal() {
502     checkState(stream != null, "Not started");
503     checkState(!cancelCalled, "call was cancelled");
504     checkState(!halfCloseCalled, "call already half-closed");
505     halfCloseCalled = true;
506     stream.halfClose();
507   }
508 
509   @Override
sendMessage(ReqT message)510   public void sendMessage(ReqT message) {
511     try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) {
512       PerfMark.attachTag(tag);
513       sendMessageInternal(message);
514     }
515   }
516 
sendMessageInternal(ReqT message)517   private void sendMessageInternal(ReqT message) {
518     checkState(stream != null, "Not started");
519     checkState(!cancelCalled, "call was cancelled");
520     checkState(!halfCloseCalled, "call was half-closed");
521     try {
522       if (stream instanceof RetriableStream) {
523         @SuppressWarnings("unchecked")
524         RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
525         retriableStream.sendMessage(message);
526       } else {
527         stream.writeMessage(method.streamRequest(message));
528       }
529     } catch (RuntimeException e) {
530       stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
531       return;
532     } catch (Error e) {
533       stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
534       throw e;
535     }
536     // For unary requests, we don't flush since we know that halfClose should be coming soon. This
537     // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
538     // possibility of broken applications forgetting to call halfClose without noticing.
539     if (!unaryRequest) {
540       stream.flush();
541     }
542   }
543 
544   @Override
setMessageCompression(boolean enabled)545   public void setMessageCompression(boolean enabled) {
546     checkState(stream != null, "Not started");
547     stream.setMessageCompression(enabled);
548   }
549 
550   @Override
isReady()551   public boolean isReady() {
552     if (halfCloseCalled) {
553       return false;
554     }
555     return stream.isReady();
556   }
557 
558   @Override
getAttributes()559   public Attributes getAttributes() {
560     if (stream != null) {
561       return stream.getAttributes();
562     }
563     return Attributes.EMPTY;
564   }
565 
closeObserver(Listener<RespT> observer, Status status, Metadata trailers)566   private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
567     observer.onClose(status, trailers);
568   }
569 
570   @Override
toString()571   public String toString() {
572     return MoreObjects.toStringHelper(this).add("method", method).toString();
573   }
574 
575   private class ClientStreamListenerImpl implements ClientStreamListener {
576     private final Listener<RespT> observer;
577     private Status exceptionStatus;
578 
ClientStreamListenerImpl(Listener<RespT> observer)579     public ClientStreamListenerImpl(Listener<RespT> observer) {
580       this.observer = checkNotNull(observer, "observer");
581     }
582 
583     /**
584      * Cancels call and schedules onClose() notification. May only be called from the application
585      * thread.
586      */
exceptionThrown(Status status)587     private void exceptionThrown(Status status) {
588       // Since each RPC can have its own executor, we can only call onClose() when we are sure there
589       // will be no further callbacks. We set the status here and overwrite the onClose() details
590       // when it arrives.
591       exceptionStatus = status;
592       stream.cancel(status);
593     }
594 
595     @Override
headersRead(final Metadata headers)596     public void headersRead(final Metadata headers) {
597       try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) {
598         PerfMark.attachTag(tag);
599         final Link link = PerfMark.linkOut();
600         final class HeadersRead extends ContextRunnable {
601           HeadersRead() {
602             super(context);
603           }
604 
605           @Override
606           public void runInContext() {
607             try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) {
608               PerfMark.attachTag(tag);
609               PerfMark.linkIn(link);
610               runInternal();
611             }
612           }
613 
614           private void runInternal() {
615             if (exceptionStatus != null) {
616               return;
617             }
618             try {
619               observer.onHeaders(headers);
620             } catch (Throwable t) {
621               exceptionThrown(
622                   Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
623             }
624           }
625         }
626 
627         callExecutor.execute(new HeadersRead());
628       }
629     }
630 
631     @Override
messagesAvailable(final MessageProducer producer)632     public void messagesAvailable(final MessageProducer producer) {
633       try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) {
634         PerfMark.attachTag(tag);
635         final Link link = PerfMark.linkOut();
636         final class MessagesAvailable extends ContextRunnable {
637           MessagesAvailable() {
638             super(context);
639           }
640 
641           @Override
642           public void runInContext() {
643             try (TaskCloseable ignore =
644                      PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) {
645               PerfMark.attachTag(tag);
646               PerfMark.linkIn(link);
647               runInternal();
648             }
649           }
650 
651           private void runInternal() {
652             if (exceptionStatus != null) {
653               GrpcUtil.closeQuietly(producer);
654               return;
655             }
656             try {
657               InputStream message;
658               while ((message = producer.next()) != null) {
659                 try {
660                   observer.onMessage(method.parseResponse(message));
661                 } catch (Throwable t) {
662                   GrpcUtil.closeQuietly(message);
663                   throw t;
664                 }
665                 message.close();
666               }
667             } catch (Throwable t) {
668               GrpcUtil.closeQuietly(producer);
669               exceptionThrown(
670                   Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
671             }
672           }
673         }
674 
675         callExecutor.execute(new MessagesAvailable());
676       }
677     }
678 
679     @Override
closed(Status status, RpcProgress rpcProgress, Metadata trailers)680     public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
681       try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) {
682         PerfMark.attachTag(tag);
683         closedInternal(status, rpcProgress, trailers);
684       }
685     }
686 
closedInternal( Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers)687     private void closedInternal(
688         Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
689       Deadline deadline = effectiveDeadline();
690       if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
691         // When the server's deadline expires, it can only reset the stream with CANCEL and no
692         // description. Since our timer may be delayed in firing, we double-check the deadline and
693         // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
694         if (deadline.isExpired()) {
695           InsightBuilder insight = new InsightBuilder();
696           stream.appendTimeoutInsight(insight);
697           status = DEADLINE_EXCEEDED.augmentDescription(
698               "ClientCall was cancelled at or after deadline. " + insight);
699           // Replace trailers to prevent mixing sources of status and trailers.
700           trailers = new Metadata();
701         }
702       }
703       final Status savedStatus = status;
704       final Metadata savedTrailers = trailers;
705       final Link link = PerfMark.linkOut();
706       final class StreamClosed extends ContextRunnable {
707         StreamClosed() {
708           super(context);
709         }
710 
711         @Override
712         public void runInContext() {
713           try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) {
714             PerfMark.attachTag(tag);
715             PerfMark.linkIn(link);
716             runInternal();
717           }
718         }
719 
720         private void runInternal() {
721           Status status = savedStatus;
722           Metadata trailers = savedTrailers;
723           if (exceptionStatus != null) {
724             // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
725             // However the cancel is racy and this closed() may have already been queued when the
726             // cancellation occurred. Since other calls like onMessage() will throw away data if
727             // exceptionStatus != null, it is semantically essential that we _not_ use a status
728             // provided by the server.
729             status = exceptionStatus;
730             // Replace trailers to prevent mixing sources of status and trailers.
731             trailers = new Metadata();
732           }
733           cancelListenersShouldBeRemoved = true;
734           try {
735             closeObserver(observer, status, trailers);
736           } finally {
737             removeContextListenerAndCancelDeadlineFuture();
738             channelCallsTracer.reportCallEnded(status.isOk());
739           }
740         }
741       }
742 
743       callExecutor.execute(new StreamClosed());
744     }
745 
746     @Override
onReady()747     public void onReady() {
748       if (method.getType().clientSendsOneMessage()) {
749         return;
750       }
751       try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) {
752         PerfMark.attachTag(tag);
753         final Link link = PerfMark.linkOut();
754 
755         final class StreamOnReady extends ContextRunnable {
756           StreamOnReady() {
757             super(context);
758           }
759 
760           @Override
761           public void runInContext() {
762             try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) {
763               PerfMark.attachTag(tag);
764               PerfMark.linkIn(link);
765               runInternal();
766             }
767           }
768 
769           private void runInternal() {
770             if (exceptionStatus != null) {
771               return;
772             }
773             try {
774               observer.onReady();
775             } catch (Throwable t) {
776               exceptionThrown(
777                   Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
778             }
779           }
780         }
781 
782         callExecutor.execute(new StreamOnReady());
783       }
784     }
785   }
786 }
787