• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23 import static io.grpc.Contexts.statusFromCancelled;
24 import static io.grpc.Status.DEADLINE_EXCEEDED;
25 import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
26 import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
27 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
28 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
29 import static java.lang.Math.max;
30 
31 import com.google.common.annotations.VisibleForTesting;
32 import com.google.common.base.MoreObjects;
33 import io.grpc.Attributes;
34 import io.grpc.CallOptions;
35 import io.grpc.ClientCall;
36 import io.grpc.Codec;
37 import io.grpc.Compressor;
38 import io.grpc.CompressorRegistry;
39 import io.grpc.Context;
40 import io.grpc.Context.CancellationListener;
41 import io.grpc.Deadline;
42 import io.grpc.DecompressorRegistry;
43 import io.grpc.InternalDecompressorRegistry;
44 import io.grpc.LoadBalancer.PickSubchannelArgs;
45 import io.grpc.Metadata;
46 import io.grpc.MethodDescriptor;
47 import io.grpc.MethodDescriptor.MethodType;
48 import io.grpc.Status;
49 import java.io.InputStream;
50 import java.nio.charset.Charset;
51 import java.util.concurrent.CancellationException;
52 import java.util.concurrent.Executor;
53 import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.ScheduledFuture;
55 import java.util.concurrent.TimeUnit;
56 import java.util.logging.Level;
57 import java.util.logging.Logger;
58 import javax.annotation.Nullable;
59 
60 /**
61  * Implementation of {@link ClientCall}.
62  */
63 final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
64 
65   private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
66   private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
67       = "gzip".getBytes(Charset.forName("US-ASCII"));
68 
69   private final MethodDescriptor<ReqT, RespT> method;
70   private final Executor callExecutor;
71   private final CallTracer channelCallsTracer;
72   private final Context context;
73   private volatile ScheduledFuture<?> deadlineCancellationFuture;
74   private final boolean unaryRequest;
75   private final CallOptions callOptions;
76   private final boolean retryEnabled;
77   private ClientStream stream;
78   private volatile boolean cancelListenersShouldBeRemoved;
79   private boolean cancelCalled;
80   private boolean halfCloseCalled;
81   private final ClientTransportProvider clientTransportProvider;
82   private final CancellationListener cancellationListener = new ContextCancellationListener();
83   private final ScheduledExecutorService deadlineCancellationExecutor;
84   private boolean fullStreamDecompression;
85   private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
86   private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
87 
ClientCallImpl( MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions, ClientTransportProvider clientTransportProvider, ScheduledExecutorService deadlineCancellationExecutor, CallTracer channelCallsTracer, boolean retryEnabled)88   ClientCallImpl(
89       MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
90       ClientTransportProvider clientTransportProvider,
91       ScheduledExecutorService deadlineCancellationExecutor,
92       CallTracer channelCallsTracer,
93       boolean retryEnabled) {
94     this.method = method;
95     // If we know that the executor is a direct executor, we don't need to wrap it with a
96     // SerializingExecutor. This is purely for performance reasons.
97     // See https://github.com/grpc/grpc-java/issues/368
98     this.callExecutor = executor == directExecutor()
99         ? new SerializeReentrantCallsDirectExecutor()
100         : new SerializingExecutor(executor);
101     this.channelCallsTracer = channelCallsTracer;
102     // Propagate the context from the thread which initiated the call to all callbacks.
103     this.context = Context.current();
104     this.unaryRequest = method.getType() == MethodType.UNARY
105         || method.getType() == MethodType.SERVER_STREAMING;
106     this.callOptions = callOptions;
107     this.clientTransportProvider = clientTransportProvider;
108     this.deadlineCancellationExecutor = deadlineCancellationExecutor;
109     this.retryEnabled = retryEnabled;
110   }
111 
112   private final class ContextCancellationListener implements CancellationListener {
113     @Override
cancelled(Context context)114     public void cancelled(Context context) {
115       stream.cancel(statusFromCancelled(context));
116     }
117   }
118 
119   /**
120    * Provider of {@link ClientTransport}s.
121    */
122   // TODO(zdapeng): replace the two APIs with a single API: newStream()
123   interface ClientTransportProvider {
124     /**
125      * Returns a transport for a new call.
126      *
127      * @param args object containing call arguments.
128      */
get(PickSubchannelArgs args)129     ClientTransport get(PickSubchannelArgs args);
130 
newRetriableStream( MethodDescriptor<ReqT, ?> method, CallOptions callOptions, Metadata headers, Context context)131     <ReqT> RetriableStream<ReqT> newRetriableStream(
132         MethodDescriptor<ReqT, ?> method,
133         CallOptions callOptions,
134         Metadata headers,
135         Context context);
136 
137   }
138 
setFullStreamDecompression(boolean fullStreamDecompression)139   ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
140     this.fullStreamDecompression = fullStreamDecompression;
141     return this;
142   }
143 
setDecompressorRegistry(DecompressorRegistry decompressorRegistry)144   ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
145     this.decompressorRegistry = decompressorRegistry;
146     return this;
147   }
148 
setCompressorRegistry(CompressorRegistry compressorRegistry)149   ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) {
150     this.compressorRegistry = compressorRegistry;
151     return this;
152   }
153 
154   @VisibleForTesting
prepareHeaders( Metadata headers, DecompressorRegistry decompressorRegistry, Compressor compressor, boolean fullStreamDecompression)155   static void prepareHeaders(
156       Metadata headers,
157       DecompressorRegistry decompressorRegistry,
158       Compressor compressor,
159       boolean fullStreamDecompression) {
160     headers.discardAll(MESSAGE_ENCODING_KEY);
161     if (compressor != Codec.Identity.NONE) {
162       headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
163     }
164 
165     headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
166     byte[] advertisedEncodings =
167         InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
168     if (advertisedEncodings.length != 0) {
169       headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
170     }
171 
172     headers.discardAll(CONTENT_ENCODING_KEY);
173     headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY);
174     if (fullStreamDecompression) {
175       headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS);
176     }
177   }
178 
179   @Override
start(final Listener<RespT> observer, Metadata headers)180   public void start(final Listener<RespT> observer, Metadata headers) {
181     checkState(stream == null, "Already started");
182     checkState(!cancelCalled, "call was cancelled");
183     checkNotNull(observer, "observer");
184     checkNotNull(headers, "headers");
185 
186     if (context.isCancelled()) {
187       // Context is already cancelled so no need to create a real stream, just notify the observer
188       // of cancellation via callback on the executor
189       stream = NoopClientStream.INSTANCE;
190       class ClosedByContext extends ContextRunnable {
191         ClosedByContext() {
192           super(context);
193         }
194 
195         @Override
196         public void runInContext() {
197           closeObserver(observer, statusFromCancelled(context), new Metadata());
198         }
199       }
200 
201       callExecutor.execute(new ClosedByContext());
202       return;
203     }
204     final String compressorName = callOptions.getCompressor();
205     Compressor compressor = null;
206     if (compressorName != null) {
207       compressor = compressorRegistry.lookupCompressor(compressorName);
208       if (compressor == null) {
209         stream = NoopClientStream.INSTANCE;
210         class ClosedByNotFoundCompressor extends ContextRunnable {
211           ClosedByNotFoundCompressor() {
212             super(context);
213           }
214 
215           @Override
216           public void runInContext() {
217             closeObserver(
218                 observer,
219                 Status.INTERNAL.withDescription(
220                     String.format("Unable to find compressor by name %s", compressorName)),
221                 new Metadata());
222           }
223         }
224 
225         callExecutor.execute(new ClosedByNotFoundCompressor());
226         return;
227       }
228     } else {
229       compressor = Codec.Identity.NONE;
230     }
231     prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
232 
233     Deadline effectiveDeadline = effectiveDeadline();
234     boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
235     if (!deadlineExceeded) {
236       logIfContextNarrowedTimeout(
237           effectiveDeadline, callOptions.getDeadline(), context.getDeadline());
238       if (retryEnabled) {
239         stream = clientTransportProvider.newRetriableStream(method, callOptions, headers, context);
240       } else {
241         ClientTransport transport = clientTransportProvider.get(
242             new PickSubchannelArgsImpl(method, headers, callOptions));
243         Context origContext = context.attach();
244         try {
245           stream = transport.newStream(method, headers, callOptions);
246         } finally {
247           context.detach(origContext);
248         }
249       }
250     } else {
251       stream = new FailingClientStream(
252           DEADLINE_EXCEEDED.withDescription("deadline exceeded: " + effectiveDeadline));
253     }
254 
255     if (callOptions.getAuthority() != null) {
256       stream.setAuthority(callOptions.getAuthority());
257     }
258     if (callOptions.getMaxInboundMessageSize() != null) {
259       stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
260     }
261     if (callOptions.getMaxOutboundMessageSize() != null) {
262       stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
263     }
264     if (effectiveDeadline != null) {
265       stream.setDeadline(effectiveDeadline);
266     }
267     stream.setCompressor(compressor);
268     if (fullStreamDecompression) {
269       stream.setFullStreamDecompression(fullStreamDecompression);
270     }
271     stream.setDecompressorRegistry(decompressorRegistry);
272     channelCallsTracer.reportCallStarted();
273     stream.start(new ClientStreamListenerImpl(observer));
274 
275     // Delay any sources of cancellation after start(), because most of the transports are broken if
276     // they receive cancel before start. Issue #1343 has more details
277 
278     // Propagate later Context cancellation to the remote side.
279     context.addListener(cancellationListener, directExecutor());
280     if (effectiveDeadline != null
281         // If the context has the effective deadline, we don't need to schedule an extra task.
282         && context.getDeadline() != effectiveDeadline
283         // If the channel has been terminated, we don't need to schedule an extra task.
284         && deadlineCancellationExecutor != null) {
285       deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
286     }
287     if (cancelListenersShouldBeRemoved) {
288       // Race detected! ClientStreamListener.closed may have been called before
289       // deadlineCancellationFuture was set / context listener added, thereby preventing the future
290       // and listener from being cancelled. Go ahead and cancel again, just to be sure it
291       // was cancelled.
292       removeContextListenerAndCancelDeadlineFuture();
293     }
294   }
295 
logIfContextNarrowedTimeout( Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, @Nullable Deadline callDeadline)296   private static void logIfContextNarrowedTimeout(
297       Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
298       @Nullable Deadline callDeadline) {
299     if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
300         || outerCallDeadline != effectiveDeadline) {
301       return;
302     }
303 
304     long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
305     StringBuilder builder = new StringBuilder(String.format(
306         "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
307     if (callDeadline == null) {
308       builder.append(" Explicit call timeout was not set.");
309     } else {
310       long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
311       builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout));
312     }
313 
314     log.fine(builder.toString());
315   }
316 
removeContextListenerAndCancelDeadlineFuture()317   private void removeContextListenerAndCancelDeadlineFuture() {
318     context.removeListener(cancellationListener);
319     ScheduledFuture<?> f = deadlineCancellationFuture;
320     if (f != null) {
321       f.cancel(false);
322     }
323   }
324 
325   private class DeadlineTimer implements Runnable {
326     private final long remainingNanos;
327 
DeadlineTimer(long remainingNanos)328     DeadlineTimer(long remainingNanos) {
329       this.remainingNanos = remainingNanos;
330     }
331 
332     @Override
run()333     public void run() {
334       // DelayedStream.cancel() is safe to call from a thread that is different from where the
335       // stream is created.
336       stream.cancel(DEADLINE_EXCEEDED.augmentDescription(
337           String.format("deadline exceeded after %dns", remainingNanos)));
338     }
339   }
340 
startDeadlineTimer(Deadline deadline)341   private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
342     long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
343     return deadlineCancellationExecutor.schedule(
344         new LogExceptionRunnable(
345             new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
346   }
347 
348   @Nullable
effectiveDeadline()349   private Deadline effectiveDeadline() {
350     // Call options and context are immutable, so we don't need to cache the deadline.
351     return min(callOptions.getDeadline(), context.getDeadline());
352   }
353 
354   @Nullable
min(@ullable Deadline deadline0, @Nullable Deadline deadline1)355   private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
356     if (deadline0 == null) {
357       return deadline1;
358     }
359     if (deadline1 == null) {
360       return deadline0;
361     }
362     return deadline0.minimum(deadline1);
363   }
364 
365   @Override
request(int numMessages)366   public void request(int numMessages) {
367     checkState(stream != null, "Not started");
368     checkArgument(numMessages >= 0, "Number requested must be non-negative");
369     stream.request(numMessages);
370   }
371 
372   @Override
cancel(@ullable String message, @Nullable Throwable cause)373   public void cancel(@Nullable String message, @Nullable Throwable cause) {
374     if (message == null && cause == null) {
375       cause = new CancellationException("Cancelled without a message or cause");
376       log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
377     }
378     if (cancelCalled) {
379       return;
380     }
381     cancelCalled = true;
382     try {
383       // Cancel is called in exception handling cases, so it may be the case that the
384       // stream was never successfully created or start has never been called.
385       if (stream != null) {
386         Status status = Status.CANCELLED;
387         if (message != null) {
388           status = status.withDescription(message);
389         } else {
390           status = status.withDescription("Call cancelled without message");
391         }
392         if (cause != null) {
393           status = status.withCause(cause);
394         }
395         stream.cancel(status);
396       }
397     } finally {
398       removeContextListenerAndCancelDeadlineFuture();
399     }
400   }
401 
402   @Override
halfClose()403   public void halfClose() {
404     checkState(stream != null, "Not started");
405     checkState(!cancelCalled, "call was cancelled");
406     checkState(!halfCloseCalled, "call already half-closed");
407     halfCloseCalled = true;
408     stream.halfClose();
409   }
410 
411   @Override
sendMessage(ReqT message)412   public void sendMessage(ReqT message) {
413     checkState(stream != null, "Not started");
414     checkState(!cancelCalled, "call was cancelled");
415     checkState(!halfCloseCalled, "call was half-closed");
416     try {
417       if (stream instanceof RetriableStream) {
418         @SuppressWarnings("unchecked")
419         RetriableStream<ReqT> retriableStream = ((RetriableStream<ReqT>) stream);
420         retriableStream.sendMessage(message);
421       } else {
422         stream.writeMessage(method.streamRequest(message));
423       }
424     } catch (RuntimeException e) {
425       stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
426       return;
427     } catch (Error e) {
428       stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
429       throw e;
430     }
431     // For unary requests, we don't flush since we know that halfClose should be coming soon. This
432     // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
433     // possibility of broken applications forgetting to call halfClose without noticing.
434     if (!unaryRequest) {
435       stream.flush();
436     }
437   }
438 
439   @Override
setMessageCompression(boolean enabled)440   public void setMessageCompression(boolean enabled) {
441     checkState(stream != null, "Not started");
442     stream.setMessageCompression(enabled);
443   }
444 
445   @Override
isReady()446   public boolean isReady() {
447     return stream.isReady();
448   }
449 
450   @Override
getAttributes()451   public Attributes getAttributes() {
452     if (stream != null) {
453       return stream.getAttributes();
454     }
455     return Attributes.EMPTY;
456   }
457 
closeObserver(Listener<RespT> observer, Status status, Metadata trailers)458   private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
459     observer.onClose(status, trailers);
460   }
461 
462   @Override
toString()463   public String toString() {
464     return MoreObjects.toStringHelper(this).add("method", method).toString();
465   }
466 
467   private class ClientStreamListenerImpl implements ClientStreamListener {
468     private final Listener<RespT> observer;
469     private boolean closed;
470 
ClientStreamListenerImpl(Listener<RespT> observer)471     public ClientStreamListenerImpl(Listener<RespT> observer) {
472       this.observer = checkNotNull(observer, "observer");
473     }
474 
475     @Override
headersRead(final Metadata headers)476     public void headersRead(final Metadata headers) {
477       class HeadersRead extends ContextRunnable {
478         HeadersRead() {
479           super(context);
480         }
481 
482         @Override
483         public final void runInContext() {
484           try {
485             if (closed) {
486               return;
487             }
488             observer.onHeaders(headers);
489           } catch (Throwable t) {
490             Status status =
491                 Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
492             stream.cancel(status);
493             close(status, new Metadata());
494           }
495         }
496       }
497 
498       callExecutor.execute(new HeadersRead());
499     }
500 
501     @Override
messagesAvailable(final MessageProducer producer)502     public void messagesAvailable(final MessageProducer producer) {
503       class MessagesAvailable extends ContextRunnable {
504         MessagesAvailable() {
505           super(context);
506         }
507 
508         @Override
509         public final void runInContext() {
510           if (closed) {
511             GrpcUtil.closeQuietly(producer);
512             return;
513           }
514 
515           InputStream message;
516           try {
517             while ((message = producer.next()) != null) {
518               try {
519                 observer.onMessage(method.parseResponse(message));
520               } catch (Throwable t) {
521                 GrpcUtil.closeQuietly(message);
522                 throw t;
523               }
524               message.close();
525             }
526           } catch (Throwable t) {
527             GrpcUtil.closeQuietly(producer);
528             Status status =
529                 Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
530             stream.cancel(status);
531             close(status, new Metadata());
532           }
533         }
534       }
535 
536       callExecutor.execute(new MessagesAvailable());
537     }
538 
539     /**
540      * Must be called from application thread.
541      */
close(Status status, Metadata trailers)542     private void close(Status status, Metadata trailers) {
543       closed = true;
544       cancelListenersShouldBeRemoved = true;
545       try {
546         closeObserver(observer, status, trailers);
547       } finally {
548         removeContextListenerAndCancelDeadlineFuture();
549         channelCallsTracer.reportCallEnded(status.isOk());
550       }
551     }
552 
553     @Override
closed(Status status, Metadata trailers)554     public void closed(Status status, Metadata trailers) {
555       closed(status, RpcProgress.PROCESSED, trailers);
556     }
557 
558     @Override
closed(Status status, RpcProgress rpcProgress, Metadata trailers)559     public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
560       Deadline deadline = effectiveDeadline();
561       if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
562         // When the server's deadline expires, it can only reset the stream with CANCEL and no
563         // description. Since our timer may be delayed in firing, we double-check the deadline and
564         // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
565         if (deadline.isExpired()) {
566           status = DEADLINE_EXCEEDED;
567           // Replace trailers to prevent mixing sources of status and trailers.
568           trailers = new Metadata();
569         }
570       }
571       final Status savedStatus = status;
572       final Metadata savedTrailers = trailers;
573       class StreamClosed extends ContextRunnable {
574         StreamClosed() {
575           super(context);
576         }
577 
578         @Override
579         public final void runInContext() {
580           if (closed) {
581             // We intentionally don't keep the status or metadata from the server.
582             return;
583           }
584           close(savedStatus, savedTrailers);
585         }
586       }
587 
588       callExecutor.execute(new StreamClosed());
589     }
590 
591     @Override
onReady()592     public void onReady() {
593       class StreamOnReady extends ContextRunnable {
594         StreamOnReady() {
595           super(context);
596         }
597 
598         @Override
599         public final void runInContext() {
600           try {
601             observer.onReady();
602           } catch (Throwable t) {
603             Status status =
604                 Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.");
605             stream.cancel(status);
606             close(status, new Metadata());
607           }
608         }
609       }
610 
611       callExecutor.execute(new StreamOnReady());
612     }
613   }
614 }
615