1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 23 import static io.grpc.Contexts.statusFromCancelled; 24 import static io.grpc.Status.DEADLINE_EXCEEDED; 25 import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY; 26 import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; 27 import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY; 28 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; 29 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; 30 import static java.lang.Math.max; 31 32 import com.google.common.annotations.VisibleForTesting; 33 import com.google.common.base.MoreObjects; 34 import io.grpc.Attributes; 35 import io.grpc.CallOptions; 36 import io.grpc.ClientCall; 37 import io.grpc.ClientStreamTracer; 38 import io.grpc.Codec; 39 import io.grpc.Compressor; 40 import io.grpc.CompressorRegistry; 41 import io.grpc.Context; 42 import io.grpc.Context.CancellationListener; 43 import io.grpc.Deadline; 44 import io.grpc.DecompressorRegistry; 45 import io.grpc.InternalConfigSelector; 46 import io.grpc.InternalDecompressorRegistry; 47 import io.grpc.Metadata; 48 import io.grpc.MethodDescriptor; 49 import io.grpc.MethodDescriptor.MethodType; 50 import io.grpc.Status; 51 import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo; 52 import io.perfmark.Link; 53 import io.perfmark.PerfMark; 54 import io.perfmark.Tag; 55 import io.perfmark.TaskCloseable; 56 import java.io.InputStream; 57 import java.nio.charset.Charset; 58 import java.util.Locale; 59 import java.util.concurrent.CancellationException; 60 import java.util.concurrent.Executor; 61 import java.util.concurrent.ScheduledExecutorService; 62 import java.util.concurrent.ScheduledFuture; 63 import java.util.concurrent.TimeUnit; 64 import java.util.logging.Level; 65 import java.util.logging.Logger; 66 import javax.annotation.Nullable; 67 68 /** 69 * Implementation of {@link ClientCall}. 70 */ 71 final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> { 72 73 private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName()); 74 private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS 75 = "gzip".getBytes(Charset.forName("US-ASCII")); 76 private static final double NANO_TO_SECS = 1.0 * TimeUnit.SECONDS.toNanos(1); 77 78 private final MethodDescriptor<ReqT, RespT> method; 79 private final Tag tag; 80 private final Executor callExecutor; 81 private final boolean callExecutorIsDirect; 82 private final CallTracer channelCallsTracer; 83 private final Context context; 84 private volatile ScheduledFuture<?> deadlineCancellationFuture; 85 private final boolean unaryRequest; 86 private CallOptions callOptions; 87 private ClientStream stream; 88 private volatile boolean cancelListenersShouldBeRemoved; 89 private boolean cancelCalled; 90 private boolean halfCloseCalled; 91 private final ClientStreamProvider clientStreamProvider; 92 private final ContextCancellationListener cancellationListener = 93 new ContextCancellationListener(); 94 private final ScheduledExecutorService deadlineCancellationExecutor; 95 private boolean fullStreamDecompression; 96 private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); 97 private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); 98 ClientCallImpl( MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions, ClientStreamProvider clientStreamProvider, ScheduledExecutorService deadlineCancellationExecutor, CallTracer channelCallsTracer, @Nullable InternalConfigSelector configSelector)99 ClientCallImpl( 100 MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions, 101 ClientStreamProvider clientStreamProvider, 102 ScheduledExecutorService deadlineCancellationExecutor, 103 CallTracer channelCallsTracer, 104 // TODO(zdapeng): remove this arg 105 @Nullable InternalConfigSelector configSelector) { 106 this.method = method; 107 // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl. 108 this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this)); 109 // If we know that the executor is a direct executor, we don't need to wrap it with a 110 // SerializingExecutor. This is purely for performance reasons. 111 // See https://github.com/grpc/grpc-java/issues/368 112 if (executor == directExecutor()) { 113 this.callExecutor = new SerializeReentrantCallsDirectExecutor(); 114 callExecutorIsDirect = true; 115 } else { 116 this.callExecutor = new SerializingExecutor(executor); 117 callExecutorIsDirect = false; 118 } 119 this.channelCallsTracer = channelCallsTracer; 120 // Propagate the context from the thread which initiated the call to all callbacks. 121 this.context = Context.current(); 122 this.unaryRequest = method.getType() == MethodType.UNARY 123 || method.getType() == MethodType.SERVER_STREAMING; 124 this.callOptions = callOptions; 125 this.clientStreamProvider = clientStreamProvider; 126 this.deadlineCancellationExecutor = deadlineCancellationExecutor; 127 PerfMark.event("ClientCall.<init>", tag); 128 } 129 130 private final class ContextCancellationListener implements CancellationListener { 131 @Override cancelled(Context context)132 public void cancelled(Context context) { 133 stream.cancel(statusFromCancelled(context)); 134 } 135 } 136 137 /** 138 * Provider of {@link ClientStream}s. 139 */ 140 interface ClientStreamProvider { newStream( MethodDescriptor<?, ?> method, CallOptions callOptions, Metadata headers, Context context)141 ClientStream newStream( 142 MethodDescriptor<?, ?> method, 143 CallOptions callOptions, 144 Metadata headers, 145 Context context); 146 } 147 setFullStreamDecompression(boolean fullStreamDecompression)148 ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) { 149 this.fullStreamDecompression = fullStreamDecompression; 150 return this; 151 } 152 setDecompressorRegistry(DecompressorRegistry decompressorRegistry)153 ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { 154 this.decompressorRegistry = decompressorRegistry; 155 return this; 156 } 157 setCompressorRegistry(CompressorRegistry compressorRegistry)158 ClientCallImpl<ReqT, RespT> setCompressorRegistry(CompressorRegistry compressorRegistry) { 159 this.compressorRegistry = compressorRegistry; 160 return this; 161 } 162 163 @VisibleForTesting prepareHeaders( Metadata headers, DecompressorRegistry decompressorRegistry, Compressor compressor, boolean fullStreamDecompression)164 static void prepareHeaders( 165 Metadata headers, 166 DecompressorRegistry decompressorRegistry, 167 Compressor compressor, 168 boolean fullStreamDecompression) { 169 headers.discardAll(CONTENT_LENGTH_KEY); 170 headers.discardAll(MESSAGE_ENCODING_KEY); 171 if (compressor != Codec.Identity.NONE) { 172 headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); 173 } 174 175 headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY); 176 byte[] advertisedEncodings = 177 InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry); 178 if (advertisedEncodings.length != 0) { 179 headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings); 180 } 181 182 headers.discardAll(CONTENT_ENCODING_KEY); 183 headers.discardAll(CONTENT_ACCEPT_ENCODING_KEY); 184 if (fullStreamDecompression) { 185 headers.put(CONTENT_ACCEPT_ENCODING_KEY, FULL_STREAM_DECOMPRESSION_ENCODINGS); 186 } 187 } 188 189 @Override start(Listener<RespT> observer, Metadata headers)190 public void start(Listener<RespT> observer, Metadata headers) { 191 try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) { 192 PerfMark.attachTag(tag); 193 startInternal(observer, headers); 194 } 195 } 196 startInternal(Listener<RespT> observer, Metadata headers)197 private void startInternal(Listener<RespT> observer, Metadata headers) { 198 checkState(stream == null, "Already started"); 199 checkState(!cancelCalled, "call was cancelled"); 200 checkNotNull(observer, "observer"); 201 checkNotNull(headers, "headers"); 202 203 if (context.isCancelled()) { 204 // Context is already cancelled so no need to create a real stream, just notify the observer 205 // of cancellation via callback on the executor 206 stream = NoopClientStream.INSTANCE; 207 final Listener<RespT> finalObserver = observer; 208 class ClosedByContext extends ContextRunnable { 209 ClosedByContext() { 210 super(context); 211 } 212 213 @Override 214 public void runInContext() { 215 closeObserver(finalObserver, statusFromCancelled(context), new Metadata()); 216 } 217 } 218 219 callExecutor.execute(new ClosedByContext()); 220 return; 221 } 222 applyMethodConfig(); 223 final String compressorName = callOptions.getCompressor(); 224 Compressor compressor; 225 if (compressorName != null) { 226 compressor = compressorRegistry.lookupCompressor(compressorName); 227 if (compressor == null) { 228 stream = NoopClientStream.INSTANCE; 229 final Listener<RespT> finalObserver = observer; 230 class ClosedByNotFoundCompressor extends ContextRunnable { 231 ClosedByNotFoundCompressor() { 232 super(context); 233 } 234 235 @Override 236 public void runInContext() { 237 closeObserver( 238 finalObserver, 239 Status.INTERNAL.withDescription( 240 String.format("Unable to find compressor by name %s", compressorName)), 241 new Metadata()); 242 } 243 } 244 245 callExecutor.execute(new ClosedByNotFoundCompressor()); 246 return; 247 } 248 } else { 249 compressor = Codec.Identity.NONE; 250 } 251 prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression); 252 253 Deadline effectiveDeadline = effectiveDeadline(); 254 boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); 255 if (!deadlineExceeded) { 256 logIfContextNarrowedTimeout( 257 effectiveDeadline, context.getDeadline(), callOptions.getDeadline()); 258 stream = clientStreamProvider.newStream(method, callOptions, headers, context); 259 } else { 260 ClientStreamTracer[] tracers = 261 GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false); 262 String deadlineName = 263 isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context"; 264 String description = String.format( 265 "ClientCall started after %s deadline was exceeded .9%f seconds ago", deadlineName, 266 effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS); 267 stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers); 268 } 269 270 if (callExecutorIsDirect) { 271 stream.optimizeForDirectExecutor(); 272 } 273 if (callOptions.getAuthority() != null) { 274 stream.setAuthority(callOptions.getAuthority()); 275 } 276 if (callOptions.getMaxInboundMessageSize() != null) { 277 stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize()); 278 } 279 if (callOptions.getMaxOutboundMessageSize() != null) { 280 stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize()); 281 } 282 if (effectiveDeadline != null) { 283 stream.setDeadline(effectiveDeadline); 284 } 285 stream.setCompressor(compressor); 286 if (fullStreamDecompression) { 287 stream.setFullStreamDecompression(fullStreamDecompression); 288 } 289 stream.setDecompressorRegistry(decompressorRegistry); 290 channelCallsTracer.reportCallStarted(); 291 stream.start(new ClientStreamListenerImpl(observer)); 292 293 // Delay any sources of cancellation after start(), because most of the transports are broken if 294 // they receive cancel before start. Issue #1343 has more details 295 296 // Propagate later Context cancellation to the remote side. 297 context.addListener(cancellationListener, directExecutor()); 298 if (effectiveDeadline != null 299 // If the context has the effective deadline, we don't need to schedule an extra task. 300 && !effectiveDeadline.equals(context.getDeadline()) 301 // If the channel has been terminated, we don't need to schedule an extra task. 302 && deadlineCancellationExecutor != null) { 303 deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline); 304 } 305 if (cancelListenersShouldBeRemoved) { 306 // Race detected! ClientStreamListener.closed may have been called before 307 // deadlineCancellationFuture was set / context listener added, thereby preventing the future 308 // and listener from being cancelled. Go ahead and cancel again, just to be sure it 309 // was cancelled. 310 removeContextListenerAndCancelDeadlineFuture(); 311 } 312 } 313 applyMethodConfig()314 private void applyMethodConfig() { 315 MethodInfo info = callOptions.getOption(MethodInfo.KEY); 316 if (info == null) { 317 return; 318 } 319 if (info.timeoutNanos != null) { 320 Deadline newDeadline = Deadline.after(info.timeoutNanos, TimeUnit.NANOSECONDS); 321 Deadline existingDeadline = callOptions.getDeadline(); 322 // If the new deadline is sooner than the existing deadline, swap them. 323 if (existingDeadline == null || newDeadline.compareTo(existingDeadline) < 0) { 324 callOptions = callOptions.withDeadline(newDeadline); 325 } 326 } 327 if (info.waitForReady != null) { 328 callOptions = 329 info.waitForReady ? callOptions.withWaitForReady() : callOptions.withoutWaitForReady(); 330 } 331 if (info.maxInboundMessageSize != null) { 332 Integer existingLimit = callOptions.getMaxInboundMessageSize(); 333 if (existingLimit != null) { 334 callOptions = 335 callOptions.withMaxInboundMessageSize( 336 Math.min(existingLimit, info.maxInboundMessageSize)); 337 } else { 338 callOptions = callOptions.withMaxInboundMessageSize(info.maxInboundMessageSize); 339 } 340 } 341 if (info.maxOutboundMessageSize != null) { 342 Integer existingLimit = callOptions.getMaxOutboundMessageSize(); 343 if (existingLimit != null) { 344 callOptions = 345 callOptions.withMaxOutboundMessageSize( 346 Math.min(existingLimit, info.maxOutboundMessageSize)); 347 } else { 348 callOptions = callOptions.withMaxOutboundMessageSize(info.maxOutboundMessageSize); 349 } 350 } 351 } 352 logIfContextNarrowedTimeout( Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, @Nullable Deadline callDeadline)353 private static void logIfContextNarrowedTimeout( 354 Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, 355 @Nullable Deadline callDeadline) { 356 if (!log.isLoggable(Level.FINE) || effectiveDeadline == null 357 || !effectiveDeadline.equals(outerCallDeadline)) { 358 return; 359 } 360 361 long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS)); 362 StringBuilder builder = new StringBuilder(String.format( 363 Locale.US, 364 "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout)); 365 if (callDeadline == null) { 366 builder.append(" Explicit call timeout was not set."); 367 } else { 368 long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS); 369 builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout)); 370 } 371 372 log.fine(builder.toString()); 373 } 374 removeContextListenerAndCancelDeadlineFuture()375 private void removeContextListenerAndCancelDeadlineFuture() { 376 context.removeListener(cancellationListener); 377 ScheduledFuture<?> f = deadlineCancellationFuture; 378 if (f != null) { 379 f.cancel(false); 380 } 381 } 382 383 private class DeadlineTimer implements Runnable { 384 private final long remainingNanos; 385 DeadlineTimer(long remainingNanos)386 DeadlineTimer(long remainingNanos) { 387 this.remainingNanos = remainingNanos; 388 } 389 390 @Override run()391 public void run() { 392 InsightBuilder insight = new InsightBuilder(); 393 stream.appendTimeoutInsight(insight); 394 // DelayedStream.cancel() is safe to call from a thread that is different from where the 395 // stream is created. 396 long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1); 397 long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1); 398 399 StringBuilder buf = new StringBuilder(); 400 buf.append("deadline exceeded after "); 401 if (remainingNanos < 0) { 402 buf.append('-'); 403 } 404 buf.append(seconds); 405 buf.append(String.format(Locale.US, ".%09d", nanos)); 406 buf.append("s. "); 407 buf.append(insight); 408 stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString())); 409 } 410 } 411 startDeadlineTimer(Deadline deadline)412 private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) { 413 long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); 414 return deadlineCancellationExecutor.schedule( 415 new LogExceptionRunnable( 416 new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS); 417 } 418 419 @Nullable effectiveDeadline()420 private Deadline effectiveDeadline() { 421 // Call options and context are immutable, so we don't need to cache the deadline. 422 return min(callOptions.getDeadline(), context.getDeadline()); 423 } 424 425 @Nullable min(@ullable Deadline deadline0, @Nullable Deadline deadline1)426 private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { 427 if (deadline0 == null) { 428 return deadline1; 429 } 430 if (deadline1 == null) { 431 return deadline0; 432 } 433 return deadline0.minimum(deadline1); 434 } 435 isFirstMin(@ullable Deadline deadline0, @Nullable Deadline deadline1)436 private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { 437 if (deadline0 == null) { 438 return false; 439 } 440 if (deadline1 == null) { 441 return true; 442 } 443 return deadline0.isBefore(deadline1); 444 } 445 446 @Override request(int numMessages)447 public void request(int numMessages) { 448 try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) { 449 PerfMark.attachTag(tag); 450 checkState(stream != null, "Not started"); 451 checkArgument(numMessages >= 0, "Number requested must be non-negative"); 452 stream.request(numMessages); 453 } 454 } 455 456 @Override cancel(@ullable String message, @Nullable Throwable cause)457 public void cancel(@Nullable String message, @Nullable Throwable cause) { 458 try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) { 459 PerfMark.attachTag(tag); 460 cancelInternal(message, cause); 461 } 462 } 463 cancelInternal(@ullable String message, @Nullable Throwable cause)464 private void cancelInternal(@Nullable String message, @Nullable Throwable cause) { 465 if (message == null && cause == null) { 466 cause = new CancellationException("Cancelled without a message or cause"); 467 log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause); 468 } 469 if (cancelCalled) { 470 return; 471 } 472 cancelCalled = true; 473 try { 474 // Cancel is called in exception handling cases, so it may be the case that the 475 // stream was never successfully created or start has never been called. 476 if (stream != null) { 477 Status status = Status.CANCELLED; 478 if (message != null) { 479 status = status.withDescription(message); 480 } else { 481 status = status.withDescription("Call cancelled without message"); 482 } 483 if (cause != null) { 484 status = status.withCause(cause); 485 } 486 stream.cancel(status); 487 } 488 } finally { 489 removeContextListenerAndCancelDeadlineFuture(); 490 } 491 } 492 493 @Override halfClose()494 public void halfClose() { 495 try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) { 496 PerfMark.attachTag(tag); 497 halfCloseInternal(); 498 } 499 } 500 halfCloseInternal()501 private void halfCloseInternal() { 502 checkState(stream != null, "Not started"); 503 checkState(!cancelCalled, "call was cancelled"); 504 checkState(!halfCloseCalled, "call already half-closed"); 505 halfCloseCalled = true; 506 stream.halfClose(); 507 } 508 509 @Override sendMessage(ReqT message)510 public void sendMessage(ReqT message) { 511 try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) { 512 PerfMark.attachTag(tag); 513 sendMessageInternal(message); 514 } 515 } 516 sendMessageInternal(ReqT message)517 private void sendMessageInternal(ReqT message) { 518 checkState(stream != null, "Not started"); 519 checkState(!cancelCalled, "call was cancelled"); 520 checkState(!halfCloseCalled, "call was half-closed"); 521 try { 522 if (stream instanceof RetriableStream) { 523 @SuppressWarnings("unchecked") 524 RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream; 525 retriableStream.sendMessage(message); 526 } else { 527 stream.writeMessage(method.streamRequest(message)); 528 } 529 } catch (RuntimeException e) { 530 stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message")); 531 return; 532 } catch (Error e) { 533 stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error")); 534 throw e; 535 } 536 // For unary requests, we don't flush since we know that halfClose should be coming soon. This 537 // allows us to piggy-back the END_STREAM=true on the last message frame without opening the 538 // possibility of broken applications forgetting to call halfClose without noticing. 539 if (!unaryRequest) { 540 stream.flush(); 541 } 542 } 543 544 @Override setMessageCompression(boolean enabled)545 public void setMessageCompression(boolean enabled) { 546 checkState(stream != null, "Not started"); 547 stream.setMessageCompression(enabled); 548 } 549 550 @Override isReady()551 public boolean isReady() { 552 if (halfCloseCalled) { 553 return false; 554 } 555 return stream.isReady(); 556 } 557 558 @Override getAttributes()559 public Attributes getAttributes() { 560 if (stream != null) { 561 return stream.getAttributes(); 562 } 563 return Attributes.EMPTY; 564 } 565 closeObserver(Listener<RespT> observer, Status status, Metadata trailers)566 private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) { 567 observer.onClose(status, trailers); 568 } 569 570 @Override toString()571 public String toString() { 572 return MoreObjects.toStringHelper(this).add("method", method).toString(); 573 } 574 575 private class ClientStreamListenerImpl implements ClientStreamListener { 576 private final Listener<RespT> observer; 577 private Status exceptionStatus; 578 ClientStreamListenerImpl(Listener<RespT> observer)579 public ClientStreamListenerImpl(Listener<RespT> observer) { 580 this.observer = checkNotNull(observer, "observer"); 581 } 582 583 /** 584 * Cancels call and schedules onClose() notification. May only be called from the application 585 * thread. 586 */ exceptionThrown(Status status)587 private void exceptionThrown(Status status) { 588 // Since each RPC can have its own executor, we can only call onClose() when we are sure there 589 // will be no further callbacks. We set the status here and overwrite the onClose() details 590 // when it arrives. 591 exceptionStatus = status; 592 stream.cancel(status); 593 } 594 595 @Override headersRead(final Metadata headers)596 public void headersRead(final Metadata headers) { 597 try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) { 598 PerfMark.attachTag(tag); 599 final Link link = PerfMark.linkOut(); 600 final class HeadersRead extends ContextRunnable { 601 HeadersRead() { 602 super(context); 603 } 604 605 @Override 606 public void runInContext() { 607 try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) { 608 PerfMark.attachTag(tag); 609 PerfMark.linkIn(link); 610 runInternal(); 611 } 612 } 613 614 private void runInternal() { 615 if (exceptionStatus != null) { 616 return; 617 } 618 try { 619 observer.onHeaders(headers); 620 } catch (Throwable t) { 621 exceptionThrown( 622 Status.CANCELLED.withCause(t).withDescription("Failed to read headers")); 623 } 624 } 625 } 626 627 callExecutor.execute(new HeadersRead()); 628 } 629 } 630 631 @Override messagesAvailable(final MessageProducer producer)632 public void messagesAvailable(final MessageProducer producer) { 633 try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) { 634 PerfMark.attachTag(tag); 635 final Link link = PerfMark.linkOut(); 636 final class MessagesAvailable extends ContextRunnable { 637 MessagesAvailable() { 638 super(context); 639 } 640 641 @Override 642 public void runInContext() { 643 try (TaskCloseable ignore = 644 PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) { 645 PerfMark.attachTag(tag); 646 PerfMark.linkIn(link); 647 runInternal(); 648 } 649 } 650 651 private void runInternal() { 652 if (exceptionStatus != null) { 653 GrpcUtil.closeQuietly(producer); 654 return; 655 } 656 try { 657 InputStream message; 658 while ((message = producer.next()) != null) { 659 try { 660 observer.onMessage(method.parseResponse(message)); 661 } catch (Throwable t) { 662 GrpcUtil.closeQuietly(message); 663 throw t; 664 } 665 message.close(); 666 } 667 } catch (Throwable t) { 668 GrpcUtil.closeQuietly(producer); 669 exceptionThrown( 670 Status.CANCELLED.withCause(t).withDescription("Failed to read message.")); 671 } 672 } 673 } 674 675 callExecutor.execute(new MessagesAvailable()); 676 } 677 } 678 679 @Override closed(Status status, RpcProgress rpcProgress, Metadata trailers)680 public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { 681 try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) { 682 PerfMark.attachTag(tag); 683 closedInternal(status, rpcProgress, trailers); 684 } 685 } 686 closedInternal( Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers)687 private void closedInternal( 688 Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) { 689 Deadline deadline = effectiveDeadline(); 690 if (status.getCode() == Status.Code.CANCELLED && deadline != null) { 691 // When the server's deadline expires, it can only reset the stream with CANCEL and no 692 // description. Since our timer may be delayed in firing, we double-check the deadline and 693 // turn the failure into the likely more helpful DEADLINE_EXCEEDED status. 694 if (deadline.isExpired()) { 695 InsightBuilder insight = new InsightBuilder(); 696 stream.appendTimeoutInsight(insight); 697 status = DEADLINE_EXCEEDED.augmentDescription( 698 "ClientCall was cancelled at or after deadline. " + insight); 699 // Replace trailers to prevent mixing sources of status and trailers. 700 trailers = new Metadata(); 701 } 702 } 703 final Status savedStatus = status; 704 final Metadata savedTrailers = trailers; 705 final Link link = PerfMark.linkOut(); 706 final class StreamClosed extends ContextRunnable { 707 StreamClosed() { 708 super(context); 709 } 710 711 @Override 712 public void runInContext() { 713 try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) { 714 PerfMark.attachTag(tag); 715 PerfMark.linkIn(link); 716 runInternal(); 717 } 718 } 719 720 private void runInternal() { 721 Status status = savedStatus; 722 Metadata trailers = savedTrailers; 723 if (exceptionStatus != null) { 724 // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel(). 725 // However the cancel is racy and this closed() may have already been queued when the 726 // cancellation occurred. Since other calls like onMessage() will throw away data if 727 // exceptionStatus != null, it is semantically essential that we _not_ use a status 728 // provided by the server. 729 status = exceptionStatus; 730 // Replace trailers to prevent mixing sources of status and trailers. 731 trailers = new Metadata(); 732 } 733 cancelListenersShouldBeRemoved = true; 734 try { 735 closeObserver(observer, status, trailers); 736 } finally { 737 removeContextListenerAndCancelDeadlineFuture(); 738 channelCallsTracer.reportCallEnded(status.isOk()); 739 } 740 } 741 } 742 743 callExecutor.execute(new StreamClosed()); 744 } 745 746 @Override onReady()747 public void onReady() { 748 if (method.getType().clientSendsOneMessage()) { 749 return; 750 } 751 try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) { 752 PerfMark.attachTag(tag); 753 final Link link = PerfMark.linkOut(); 754 755 final class StreamOnReady extends ContextRunnable { 756 StreamOnReady() { 757 super(context); 758 } 759 760 @Override 761 public void runInContext() { 762 try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) { 763 PerfMark.attachTag(tag); 764 PerfMark.linkIn(link); 765 runInternal(); 766 } 767 } 768 769 private void runInternal() { 770 if (exceptionStatus != null) { 771 return; 772 } 773 try { 774 observer.onReady(); 775 } catch (Throwable t) { 776 exceptionThrown( 777 Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.")); 778 } 779 } 780 } 781 782 callExecutor.execute(new StreamOnReady()); 783 } 784 } 785 } 786 } 787