1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 23 import static io.grpc.Contexts.statusFromCancelled; 24 import static io.grpc.Status.DEADLINE_EXCEEDED; 25 import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY; 26 import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; 27 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; 28 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; 29 import static java.lang.Math.max; 30 31 import com.google.common.annotations.VisibleForTesting; 32 import com.google.common.base.MoreObjects; 33 import io.grpc.Attributes; 34 import io.grpc.CallOptions; 35 import io.grpc.ClientCall; 36 import io.grpc.Codec; 37 import io.grpc.Compressor; 38 import io.grpc.CompressorRegistry; 39 import io.grpc.Context; 40 import io.grpc.Context.CancellationListener; 41 import io.grpc.Deadline; 42 import io.grpc.DecompressorRegistry; 43 import io.grpc.InternalDecompressorRegistry; 44 import io.grpc.LoadBalancer.PickSubchannelArgs; 45 import io.grpc.Metadata; 46 import io.grpc.MethodDescriptor; 47 import io.grpc.MethodDescriptor.MethodType; 48 import io.grpc.Status; 49 import java.io.InputStream; 50 import java.nio.charset.Charset; 51 import java.util.concurrent.CancellationException; 52 import java.util.concurrent.Executor; 53 import java.util.concurrent.ScheduledExecutorService; 54 import java.util.concurrent.ScheduledFuture; 55 import java.util.concurrent.TimeUnit; 56 import java.util.logging.Level; 57 import java.util.logging.Logger; 58 import javax.annotation.Nullable; 59 60 /** 61 * Implementation of {@link ClientCall}. 62 */ 63 final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> { 64 65 private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName()); 66 private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS 67 = "gzip".getBytes(Charset.forName("US-ASCII")); 68 69 private final MethodDescriptor<ReqT, RespT> method; 70 private final Executor callExecutor; 71 private final CallTracer channelCallsTracer; 72 private final Context context; 73 private volatile ScheduledFuture<?> deadlineCancellationFuture; 74 private final boolean unaryRequest; 75 private final CallOptions callOptions; 76 private final boolean retryEnabled; 77 private ClientStream stream; 78 private volatile boolean cancelListenersShouldBeRemoved; 79 private boolean cancelCalled; 80 private boolean halfCloseCalled; 81 private final ClientTransportProvider clientTransportProvider; 82 private final CancellationListener cancellationListener = new ContextCancellationListener(); 83 private final ScheduledExecutorService deadlineCancellationExecutor; 84 private boolean fullStreamDecompression; 85 private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); 86 private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); 87 ClientCallImpl( MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions, ClientTransportProvider clientTransportProvider, ScheduledExecutorService deadlineCancellationExecutor, CallTracer channelCallsTracer, boolean retryEnabled)88 ClientCallImpl( 89 MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions, 90 ClientTransportProvider clientTransportProvider, 91 ScheduledExecutorService deadlineCancellationExecutor, 92 CallTracer channelCallsTracer, 93 boolean retryEnabled) { 94 this.method = method; 95 // If we know that the executor is a direct executor, we don't need to wrap it with a 96 // SerializingExecutor. This is purely for performance reasons. 97 // See https://github.com/grpc/grpc-java/issues/368 98 this.callExecutor = executor == directExecutor() 99 ? new SerializeReentrantCallsDirectExecutor() 100 : new SerializingExecutor(executor); 101 this.channelCallsTracer = channelCallsTracer; 102 // Propagate the context from the thread which initiated the call to all callbacks. 103 this.context = Context.current(); 104 this.unaryRequest = method.getType() == MethodType.UNARY 105 || method.getType() == MethodType.SERVER_STREAMING; 106 this.callOptions = callOptions; 107 this.clientTransportProvider = clientTransportProvider; 108 this.deadlineCancellationExecutor = deadlineCancellationExecutor; 109 this.retryEnabled = retryEnabled; 110 } 111 112 private final class ContextCancellationListener implements CancellationListener { 113 @Override cancelled(Context context)114 public void cancelled(Context context) { 115 stream.cancel(statusFromCancelled(context)); 116 } 117 } 118 119 /** 120 * Provider of {@link ClientTransport}s. 121 */ 122 // TODO(zdapeng): replace the two APIs with a single API: newStream() 123 interface ClientTransportProvider { 124 /** 125 * Returns a transport for a new call. 126 * 127 * @param args object containing call arguments. 128 */ get(PickSubchannelArgs args)129 ClientTransport get(PickSubchannelArgs args); 130 newRetriableStream( MethodDescriptor<ReqT, ?> method, CallOptions callOptions, Metadata headers, Context context)131 <ReqT> RetriableStream<ReqT> newRetriableStream( 132 MethodDescriptor<ReqT, ?> method, 133 CallOptions callOptions, 134 Metadata headers, 135 Context context); 136 137 } 138 setFullStreamDecompression(boolean fullStreamDecompression)139 ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) { 140 this.fullStreamDecompression = fullStreamDecompression; 141 return this; 142 } 143 setDecompressorRegistry(DecompressorRegistry decompressorRegistry)144 ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { 145 this.decompressorRegistry = decompressorRegistry; 146 return this; 147 } 148 setCompressorRegistry(CompressorRegistry compressorRegistry)149 ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) { 150 this.compressorRegistry = compressorRegistry; 151 return this; 152 } 153 154 @VisibleForTesting prepareHeaders( Metadata headers, DecompressorRegistry decompressorRegistry, Compressor compressor, boolean fullStreamDecompression)155 static void prepareHeaders( 156 Metadata headers, 157 DecompressorRegistry decompressorRegistry, 158 Compressor compressor, 159 boolean fullStreamDecompression) { 160 headers.discardAll(MESSAGE_ENCODING_KEY); 161 if (compressor != Codec.Identity.NONE) { 162 headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); 163 } 164 165 headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY); 166 byte[] advertisedEncodings = 167 InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry); 168 if (advertisedEncodings.length != 0) { 169 headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings); 170 } 171 172 headers.discardAll(CONTENT_ENCODING_KEY); 173 headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY); 174 if (fullStreamDecompression) { 175 headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS); 176 } 177 } 178 179 @Override start(final Listener<RespT> observer, Metadata headers)180 public void start(final Listener<RespT> observer, Metadata headers) { 181 checkState(stream == null, "Already started"); 182 checkState(!cancelCalled, "call was cancelled"); 183 checkNotNull(observer, "observer"); 184 checkNotNull(headers, "headers"); 185 186 if (context.isCancelled()) { 187 // Context is already cancelled so no need to create a real stream, just notify the observer 188 // of cancellation via callback on the executor 189 stream = NoopClientStream.INSTANCE; 190 class ClosedByContext extends ContextRunnable { 191 ClosedByContext() { 192 super(context); 193 } 194 195 @Override 196 public void runInContext() { 197 closeObserver(observer, statusFromCancelled(context), new Metadata()); 198 } 199 } 200 201 callExecutor.execute(new ClosedByContext()); 202 return; 203 } 204 final String compressorName = callOptions.getCompressor(); 205 Compressor compressor = null; 206 if (compressorName != null) { 207 compressor = compressorRegistry.lookupCompressor(compressorName); 208 if (compressor == null) { 209 stream = NoopClientStream.INSTANCE; 210 class ClosedByNotFoundCompressor extends ContextRunnable { 211 ClosedByNotFoundCompressor() { 212 super(context); 213 } 214 215 @Override 216 public void runInContext() { 217 closeObserver( 218 observer, 219 Status.INTERNAL.withDescription( 220 String.format("Unable to find compressor by name %s", compressorName)), 221 new Metadata()); 222 } 223 } 224 225 callExecutor.execute(new ClosedByNotFoundCompressor()); 226 return; 227 } 228 } else { 229 compressor = Codec.Identity.NONE; 230 } 231 prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression); 232 233 Deadline effectiveDeadline = effectiveDeadline(); 234 boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); 235 if (!deadlineExceeded) { 236 logIfContextNarrowedTimeout( 237 effectiveDeadline, callOptions.getDeadline(), context.getDeadline()); 238 if (retryEnabled) { 239 stream = clientTransportProvider.newRetriableStream(method, callOptions, headers, context); 240 } else { 241 ClientTransport transport = clientTransportProvider.get( 242 new PickSubchannelArgsImpl(method, headers, callOptions)); 243 Context origContext = context.attach(); 244 try { 245 stream = transport.newStream(method, headers, callOptions); 246 } finally { 247 context.detach(origContext); 248 } 249 } 250 } else { 251 stream = new FailingClientStream( 252 DEADLINE_EXCEEDED.withDescription("deadline exceeded: " + effectiveDeadline)); 253 } 254 255 if (callOptions.getAuthority() != null) { 256 stream.setAuthority(callOptions.getAuthority()); 257 } 258 if (callOptions.getMaxInboundMessageSize() != null) { 259 stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize()); 260 } 261 if (callOptions.getMaxOutboundMessageSize() != null) { 262 stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize()); 263 } 264 if (effectiveDeadline != null) { 265 stream.setDeadline(effectiveDeadline); 266 } 267 stream.setCompressor(compressor); 268 if (fullStreamDecompression) { 269 stream.setFullStreamDecompression(fullStreamDecompression); 270 } 271 stream.setDecompressorRegistry(decompressorRegistry); 272 channelCallsTracer.reportCallStarted(); 273 stream.start(new ClientStreamListenerImpl(observer)); 274 275 // Delay any sources of cancellation after start(), because most of the transports are broken if 276 // they receive cancel before start. Issue #1343 has more details 277 278 // Propagate later Context cancellation to the remote side. 279 context.addListener(cancellationListener, directExecutor()); 280 if (effectiveDeadline != null 281 // If the context has the effective deadline, we don't need to schedule an extra task. 282 && context.getDeadline() != effectiveDeadline 283 // If the channel has been terminated, we don't need to schedule an extra task. 284 && deadlineCancellationExecutor != null) { 285 deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline); 286 } 287 if (cancelListenersShouldBeRemoved) { 288 // Race detected! ClientStreamListener.closed may have been called before 289 // deadlineCancellationFuture was set / context listener added, thereby preventing the future 290 // and listener from being cancelled. Go ahead and cancel again, just to be sure it 291 // was cancelled. 292 removeContextListenerAndCancelDeadlineFuture(); 293 } 294 } 295 logIfContextNarrowedTimeout( Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, @Nullable Deadline callDeadline)296 private static void logIfContextNarrowedTimeout( 297 Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, 298 @Nullable Deadline callDeadline) { 299 if (!log.isLoggable(Level.FINE) || effectiveDeadline == null 300 || outerCallDeadline != effectiveDeadline) { 301 return; 302 } 303 304 long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS)); 305 StringBuilder builder = new StringBuilder(String.format( 306 "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout)); 307 if (callDeadline == null) { 308 builder.append(" Explicit call timeout was not set."); 309 } else { 310 long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS); 311 builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout)); 312 } 313 314 log.fine(builder.toString()); 315 } 316 removeContextListenerAndCancelDeadlineFuture()317 private void removeContextListenerAndCancelDeadlineFuture() { 318 context.removeListener(cancellationListener); 319 ScheduledFuture<?> f = deadlineCancellationFuture; 320 if (f != null) { 321 f.cancel(false); 322 } 323 } 324 325 private class DeadlineTimer implements Runnable { 326 private final long remainingNanos; 327 DeadlineTimer(long remainingNanos)328 DeadlineTimer(long remainingNanos) { 329 this.remainingNanos = remainingNanos; 330 } 331 332 @Override run()333 public void run() { 334 // DelayedStream.cancel() is safe to call from a thread that is different from where the 335 // stream is created. 336 stream.cancel(DEADLINE_EXCEEDED.augmentDescription( 337 String.format("deadline exceeded after %dns", remainingNanos))); 338 } 339 } 340 startDeadlineTimer(Deadline deadline)341 private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) { 342 long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); 343 return deadlineCancellationExecutor.schedule( 344 new LogExceptionRunnable( 345 new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS); 346 } 347 348 @Nullable effectiveDeadline()349 private Deadline effectiveDeadline() { 350 // Call options and context are immutable, so we don't need to cache the deadline. 351 return min(callOptions.getDeadline(), context.getDeadline()); 352 } 353 354 @Nullable min(@ullable Deadline deadline0, @Nullable Deadline deadline1)355 private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { 356 if (deadline0 == null) { 357 return deadline1; 358 } 359 if (deadline1 == null) { 360 return deadline0; 361 } 362 return deadline0.minimum(deadline1); 363 } 364 365 @Override request(int numMessages)366 public void request(int numMessages) { 367 checkState(stream != null, "Not started"); 368 checkArgument(numMessages >= 0, "Number requested must be non-negative"); 369 stream.request(numMessages); 370 } 371 372 @Override cancel(@ullable String message, @Nullable Throwable cause)373 public void cancel(@Nullable String message, @Nullable Throwable cause) { 374 if (message == null && cause == null) { 375 cause = new CancellationException("Cancelled without a message or cause"); 376 log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause); 377 } 378 if (cancelCalled) { 379 return; 380 } 381 cancelCalled = true; 382 try { 383 // Cancel is called in exception handling cases, so it may be the case that the 384 // stream was never successfully created or start has never been called. 385 if (stream != null) { 386 Status status = Status.CANCELLED; 387 if (message != null) { 388 status = status.withDescription(message); 389 } else { 390 status = status.withDescription("Call cancelled without message"); 391 } 392 if (cause != null) { 393 status = status.withCause(cause); 394 } 395 stream.cancel(status); 396 } 397 } finally { 398 removeContextListenerAndCancelDeadlineFuture(); 399 } 400 } 401 402 @Override halfClose()403 public void halfClose() { 404 checkState(stream != null, "Not started"); 405 checkState(!cancelCalled, "call was cancelled"); 406 checkState(!halfCloseCalled, "call already half-closed"); 407 halfCloseCalled = true; 408 stream.halfClose(); 409 } 410 411 @Override sendMessage(ReqT message)412 public void sendMessage(ReqT message) { 413 checkState(stream != null, "Not started"); 414 checkState(!cancelCalled, "call was cancelled"); 415 checkState(!halfCloseCalled, "call was half-closed"); 416 try { 417 if (stream instanceof RetriableStream) { 418 @SuppressWarnings("unchecked") 419 RetriableStream<ReqT> retriableStream = ((RetriableStream<ReqT>) stream); 420 retriableStream.sendMessage(message); 421 } else { 422 stream.writeMessage(method.streamRequest(message)); 423 } 424 } catch (RuntimeException e) { 425 stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message")); 426 return; 427 } catch (Error e) { 428 stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error")); 429 throw e; 430 } 431 // For unary requests, we don't flush since we know that halfClose should be coming soon. This 432 // allows us to piggy-back the END_STREAM=true on the last message frame without opening the 433 // possibility of broken applications forgetting to call halfClose without noticing. 434 if (!unaryRequest) { 435 stream.flush(); 436 } 437 } 438 439 @Override setMessageCompression(boolean enabled)440 public void setMessageCompression(boolean enabled) { 441 checkState(stream != null, "Not started"); 442 stream.setMessageCompression(enabled); 443 } 444 445 @Override isReady()446 public boolean isReady() { 447 return stream.isReady(); 448 } 449 450 @Override getAttributes()451 public Attributes getAttributes() { 452 if (stream != null) { 453 return stream.getAttributes(); 454 } 455 return Attributes.EMPTY; 456 } 457 closeObserver(Listener<RespT> observer, Status status, Metadata trailers)458 private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) { 459 observer.onClose(status, trailers); 460 } 461 462 @Override toString()463 public String toString() { 464 return MoreObjects.toStringHelper(this).add("method", method).toString(); 465 } 466 467 private class ClientStreamListenerImpl implements ClientStreamListener { 468 private final Listener<RespT> observer; 469 private boolean closed; 470 ClientStreamListenerImpl(Listener<RespT> observer)471 public ClientStreamListenerImpl(Listener<RespT> observer) { 472 this.observer = checkNotNull(observer, "observer"); 473 } 474 475 @Override headersRead(final Metadata headers)476 public void headersRead(final Metadata headers) { 477 class HeadersRead extends ContextRunnable { 478 HeadersRead() { 479 super(context); 480 } 481 482 @Override 483 public final void runInContext() { 484 try { 485 if (closed) { 486 return; 487 } 488 observer.onHeaders(headers); 489 } catch (Throwable t) { 490 Status status = 491 Status.CANCELLED.withCause(t).withDescription("Failed to read headers"); 492 stream.cancel(status); 493 close(status, new Metadata()); 494 } 495 } 496 } 497 498 callExecutor.execute(new HeadersRead()); 499 } 500 501 @Override messagesAvailable(final MessageProducer producer)502 public void messagesAvailable(final MessageProducer producer) { 503 class MessagesAvailable extends ContextRunnable { 504 MessagesAvailable() { 505 super(context); 506 } 507 508 @Override 509 public final void runInContext() { 510 if (closed) { 511 GrpcUtil.closeQuietly(producer); 512 return; 513 } 514 515 InputStream message; 516 try { 517 while ((message = producer.next()) != null) { 518 try { 519 observer.onMessage(method.parseResponse(message)); 520 } catch (Throwable t) { 521 GrpcUtil.closeQuietly(message); 522 throw t; 523 } 524 message.close(); 525 } 526 } catch (Throwable t) { 527 GrpcUtil.closeQuietly(producer); 528 Status status = 529 Status.CANCELLED.withCause(t).withDescription("Failed to read message."); 530 stream.cancel(status); 531 close(status, new Metadata()); 532 } 533 } 534 } 535 536 callExecutor.execute(new MessagesAvailable()); 537 } 538 539 /** 540 * Must be called from application thread. 541 */ close(Status status, Metadata trailers)542 private void close(Status status, Metadata trailers) { 543 closed = true; 544 cancelListenersShouldBeRemoved = true; 545 try { 546 closeObserver(observer, status, trailers); 547 } finally { 548 removeContextListenerAndCancelDeadlineFuture(); 549 channelCallsTracer.reportCallEnded(status.isOk()); 550 } 551 } 552 553 @Override closed(Status status, Metadata trailers)554 public void closed(Status status, Metadata trailers) { 555 closed(status, RpcProgress.PROCESSED, trailers); 556 } 557 558 @Override closed(Status status, RpcProgress rpcProgress, Metadata trailers)559 public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { 560 Deadline deadline = effectiveDeadline(); 561 if (status.getCode() == Status.Code.CANCELLED && deadline != null) { 562 // When the server's deadline expires, it can only reset the stream with CANCEL and no 563 // description. Since our timer may be delayed in firing, we double-check the deadline and 564 // turn the failure into the likely more helpful DEADLINE_EXCEEDED status. 565 if (deadline.isExpired()) { 566 status = DEADLINE_EXCEEDED; 567 // Replace trailers to prevent mixing sources of status and trailers. 568 trailers = new Metadata(); 569 } 570 } 571 final Status savedStatus = status; 572 final Metadata savedTrailers = trailers; 573 class StreamClosed extends ContextRunnable { 574 StreamClosed() { 575 super(context); 576 } 577 578 @Override 579 public final void runInContext() { 580 if (closed) { 581 // We intentionally don't keep the status or metadata from the server. 582 return; 583 } 584 close(savedStatus, savedTrailers); 585 } 586 } 587 588 callExecutor.execute(new StreamClosed()); 589 } 590 591 @Override onReady()592 public void onReady() { 593 class StreamOnReady extends ContextRunnable { 594 StreamOnReady() { 595 super(context); 596 } 597 598 @Override 599 public final void runInContext() { 600 try { 601 observer.onReady(); 602 } catch (Throwable t) { 603 Status status = 604 Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."); 605 stream.cancel(status); 606 close(status, new Metadata()); 607 } 608 } 609 } 610 611 callExecutor.execute(new StreamOnReady()); 612 } 613 } 614 } 615