1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.Objects; 25 import io.grpc.Attributes; 26 import io.grpc.ClientStreamTracer; 27 import io.grpc.Compressor; 28 import io.grpc.Deadline; 29 import io.grpc.DecompressorRegistry; 30 import io.grpc.Metadata; 31 import io.grpc.MethodDescriptor; 32 import io.grpc.Status; 33 import io.grpc.SynchronizationContext; 34 import io.grpc.internal.ClientStreamListener.RpcProgress; 35 import java.io.InputStream; 36 import java.lang.Thread.UncaughtExceptionHandler; 37 import java.util.ArrayList; 38 import java.util.Collection; 39 import java.util.Collections; 40 import java.util.List; 41 import java.util.Random; 42 import java.util.concurrent.Executor; 43 import java.util.concurrent.Future; 44 import java.util.concurrent.ScheduledExecutorService; 45 import java.util.concurrent.TimeUnit; 46 import java.util.concurrent.atomic.AtomicBoolean; 47 import java.util.concurrent.atomic.AtomicInteger; 48 import java.util.concurrent.atomic.AtomicLong; 49 import javax.annotation.CheckForNull; 50 import javax.annotation.CheckReturnValue; 51 import javax.annotation.Nullable; 52 import javax.annotation.concurrent.GuardedBy; 53 54 /** A logical {@link ClientStream} that is retriable. */ 55 abstract class RetriableStream<ReqT> implements ClientStream { 56 @VisibleForTesting 57 static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS = 58 Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER); 59 60 @VisibleForTesting 61 static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS = 62 Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER); 63 64 private static final Status CANCELLED_BECAUSE_COMMITTED = 65 Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed"); 66 67 private final MethodDescriptor<ReqT, ?> method; 68 private final Executor callExecutor; 69 private final Executor listenerSerializeExecutor = new SynchronizationContext( 70 new UncaughtExceptionHandler() { 71 @Override 72 public void uncaughtException(Thread t, Throwable e) { 73 throw Status.fromThrowable(e) 74 .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.") 75 .asRuntimeException(); 76 } 77 } 78 ); 79 private final ScheduledExecutorService scheduledExecutorService; 80 // Must not modify it. 81 private final Metadata headers; 82 @Nullable 83 private final RetryPolicy retryPolicy; 84 @Nullable 85 private final HedgingPolicy hedgingPolicy; 86 private final boolean isHedging; 87 88 /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */ 89 private final Object lock = new Object(); 90 91 private final ChannelBufferMeter channelBufferUsed; 92 private final long perRpcBufferLimit; 93 private final long channelBufferLimit; 94 @Nullable 95 private final Throttle throttle; 96 @GuardedBy("lock") 97 private final InsightBuilder closedSubstreamsInsight = new InsightBuilder(); 98 99 private volatile State state = new State( 100 new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false, 101 false, 0); 102 103 /** 104 * Either non-local transparent retry happened or reached server's application logic. 105 * 106 * <p>Note that local-only transparent retries are unlimited. 107 */ 108 private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean(); 109 private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger(); 110 private final AtomicInteger inFlightSubStreams = new AtomicInteger(); 111 private SavedCloseMasterListenerReason savedCloseMasterListenerReason; 112 113 // Used for recording the share of buffer used for the current call out of the channel buffer. 114 // This field would not be necessary if there is no channel buffer limit. 115 @GuardedBy("lock") 116 private long perRpcBufferUsed; 117 118 private ClientStreamListener masterListener; 119 @GuardedBy("lock") 120 private FutureCanceller scheduledRetry; 121 @GuardedBy("lock") 122 private FutureCanceller scheduledHedging; 123 private long nextBackoffIntervalNanos; 124 private Status cancellationStatus; 125 private boolean isClosed; 126 RetriableStream( MethodDescriptor<ReqT, ?> method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, Executor callExecutor, ScheduledExecutorService scheduledExecutorService, @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy, @Nullable Throttle throttle)127 RetriableStream( 128 MethodDescriptor<ReqT, ?> method, Metadata headers, 129 ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, 130 Executor callExecutor, ScheduledExecutorService scheduledExecutorService, 131 @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy, 132 @Nullable Throttle throttle) { 133 this.method = method; 134 this.channelBufferUsed = channelBufferUsed; 135 this.perRpcBufferLimit = perRpcBufferLimit; 136 this.channelBufferLimit = channelBufferLimit; 137 this.callExecutor = callExecutor; 138 this.scheduledExecutorService = scheduledExecutorService; 139 this.headers = headers; 140 this.retryPolicy = retryPolicy; 141 if (retryPolicy != null) { 142 this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos; 143 } 144 this.hedgingPolicy = hedgingPolicy; 145 checkArgument( 146 retryPolicy == null || hedgingPolicy == null, 147 "Should not provide both retryPolicy and hedgingPolicy"); 148 this.isHedging = hedgingPolicy != null; 149 this.throttle = throttle; 150 } 151 152 @SuppressWarnings("GuardedBy") 153 @Nullable // null if already committed 154 @CheckReturnValue commit(final Substream winningSubstream)155 private Runnable commit(final Substream winningSubstream) { 156 157 synchronized (lock) { 158 if (state.winningSubstream != null) { 159 return null; 160 } 161 final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams; 162 163 state = state.committed(winningSubstream); 164 165 // subtract the share of this RPC from channelBufferUsed. 166 channelBufferUsed.addAndGet(-perRpcBufferUsed); 167 168 final Future<?> retryFuture; 169 if (scheduledRetry != null) { 170 // TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead 171 // found: 'this.lock' 172 retryFuture = scheduledRetry.markCancelled(); 173 scheduledRetry = null; 174 } else { 175 retryFuture = null; 176 } 177 // cancel the scheduled hedging if it is scheduled prior to the commitment 178 final Future<?> hedgingFuture; 179 if (scheduledHedging != null) { 180 // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead 181 // found: 'this.lock' 182 hedgingFuture = scheduledHedging.markCancelled(); 183 scheduledHedging = null; 184 } else { 185 hedgingFuture = null; 186 } 187 188 class CommitTask implements Runnable { 189 @Override 190 public void run() { 191 // For hedging only, not needed for normal retry 192 for (Substream substream : savedDrainedSubstreams) { 193 if (substream != winningSubstream) { 194 substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED); 195 } 196 } 197 if (retryFuture != null) { 198 retryFuture.cancel(false); 199 } 200 if (hedgingFuture != null) { 201 hedgingFuture.cancel(false); 202 } 203 204 postCommit(); 205 } 206 } 207 208 return new CommitTask(); 209 } 210 } 211 postCommit()212 abstract void postCommit(); 213 214 /** 215 * Calls commit() and if successful runs the post commit task. 216 */ commitAndRun(Substream winningSubstream)217 private void commitAndRun(Substream winningSubstream) { 218 Runnable postCommitTask = commit(winningSubstream); 219 220 if (postCommitTask != null) { 221 postCommitTask.run(); 222 } 223 } 224 225 // returns null means we should not create new sub streams, e.g. cancelled or 226 // other close condition is met for retriableStream. 227 @Nullable createSubstream(int previousAttemptCount, boolean isTransparentRetry)228 private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) { 229 int inFlight; 230 do { 231 inFlight = inFlightSubStreams.get(); 232 if (inFlight < 0) { 233 return null; 234 } 235 } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1)); 236 Substream sub = new Substream(previousAttemptCount); 237 // one tracer per substream 238 final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub); 239 ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() { 240 @Override 241 public ClientStreamTracer newClientStreamTracer( 242 ClientStreamTracer.StreamInfo info, Metadata headers) { 243 return bufferSizeTracer; 244 } 245 }; 246 247 Metadata newHeaders = updateHeaders(headers, previousAttemptCount); 248 // NOTICE: This set _must_ be done before stream.start() and it actually is. 249 sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry); 250 return sub; 251 } 252 253 /** 254 * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned 255 * Client stream is not yet started. 256 */ newSubstream( Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts, boolean isTransparentRetry)257 abstract ClientStream newSubstream( 258 Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts, 259 boolean isTransparentRetry); 260 261 /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */ 262 @VisibleForTesting updateHeaders( Metadata originalHeaders, int previousAttemptCount)263 final Metadata updateHeaders( 264 Metadata originalHeaders, int previousAttemptCount) { 265 Metadata newHeaders = new Metadata(); 266 newHeaders.merge(originalHeaders); 267 if (previousAttemptCount > 0) { 268 newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount)); 269 } 270 return newHeaders; 271 } 272 drain(Substream substream)273 private void drain(Substream substream) { 274 int index = 0; 275 int chunk = 0x80; 276 List<BufferEntry> list = null; 277 boolean streamStarted = false; 278 Runnable onReadyRunnable = null; 279 280 while (true) { 281 State savedState; 282 283 synchronized (lock) { 284 savedState = state; 285 if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { 286 // committed but not me, to be cancelled 287 break; 288 } 289 if (savedState.cancelled) { 290 break; 291 } 292 if (index == savedState.buffer.size()) { // I'm drained 293 state = savedState.substreamDrained(substream); 294 if (!isReady()) { 295 return; 296 } 297 onReadyRunnable = new Runnable() { 298 @Override 299 public void run() { 300 if (!isClosed) { 301 masterListener.onReady(); 302 } 303 } 304 }; 305 break; 306 } 307 308 if (substream.closed) { 309 return; 310 } 311 312 int stop = Math.min(index + chunk, savedState.buffer.size()); 313 if (list == null) { 314 list = new ArrayList<>(savedState.buffer.subList(index, stop)); 315 } else { 316 list.clear(); 317 list.addAll(savedState.buffer.subList(index, stop)); 318 } 319 index = stop; 320 } 321 322 for (BufferEntry bufferEntry : list) { 323 bufferEntry.runWith(substream); 324 if (bufferEntry instanceof RetriableStream.StartEntry) { 325 streamStarted = true; 326 } 327 savedState = state; 328 if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { 329 // committed but not me, to be cancelled 330 break; 331 } 332 if (savedState.cancelled) { 333 break; 334 } 335 } 336 } 337 338 if (onReadyRunnable != null) { 339 listenerSerializeExecutor.execute(onReadyRunnable); 340 return; 341 } 342 343 if (!streamStarted) { 344 // Start stream so inFlightSubStreams is decremented in Sublistener.closed() 345 substream.stream.start(new Sublistener(substream)); 346 } 347 substream.stream.cancel( 348 state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED); 349 } 350 351 /** 352 * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown. 353 */ 354 @CheckReturnValue 355 @Nullable prestart()356 abstract Status prestart(); 357 358 class StartEntry implements BufferEntry { 359 @Override runWith(Substream substream)360 public void runWith(Substream substream) { 361 substream.stream.start(new Sublistener(substream)); 362 } 363 } 364 365 /** Starts the first PRC attempt. */ 366 @Override start(ClientStreamListener listener)367 public final void start(ClientStreamListener listener) { 368 masterListener = listener; 369 370 Status shutdownStatus = prestart(); 371 372 if (shutdownStatus != null) { 373 cancel(shutdownStatus); 374 return; 375 } 376 377 synchronized (lock) { 378 state.buffer.add(new StartEntry()); 379 } 380 381 Substream substream = createSubstream(0, false); 382 if (substream == null) { 383 return; 384 } 385 if (isHedging) { 386 FutureCanceller scheduledHedgingRef = null; 387 388 synchronized (lock) { 389 state = state.addActiveHedge(substream); 390 if (hasPotentialHedging(state) 391 && (throttle == null || throttle.isAboveThreshold())) { 392 scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock); 393 } 394 } 395 396 if (scheduledHedgingRef != null) { 397 scheduledHedgingRef.setFuture( 398 scheduledExecutorService.schedule( 399 new HedgingRunnable(scheduledHedgingRef), 400 hedgingPolicy.hedgingDelayNanos, 401 TimeUnit.NANOSECONDS)); 402 } 403 } 404 405 drain(substream); 406 } 407 408 @SuppressWarnings("GuardedBy") pushbackHedging(@ullable Integer delayMillis)409 private void pushbackHedging(@Nullable Integer delayMillis) { 410 if (delayMillis == null) { 411 return; 412 } 413 if (delayMillis < 0) { 414 freezeHedging(); 415 return; 416 } 417 418 // Cancels the current scheduledHedging and reschedules a new one. 419 FutureCanceller future; 420 Future<?> futureToBeCancelled; 421 422 synchronized (lock) { 423 if (scheduledHedging == null) { 424 return; 425 } 426 427 // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead 428 // found: 'this.lock' 429 futureToBeCancelled = scheduledHedging.markCancelled(); 430 scheduledHedging = future = new FutureCanceller(lock); 431 } 432 433 if (futureToBeCancelled != null) { 434 futureToBeCancelled.cancel(false); 435 } 436 future.setFuture(scheduledExecutorService.schedule( 437 new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS)); 438 } 439 440 private final class HedgingRunnable implements Runnable { 441 442 // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed 443 // by a positive push-back just after newSubstream is instantiated, so that we can double check. 444 final FutureCanceller scheduledHedgingRef; 445 HedgingRunnable(FutureCanceller scheduledHedging)446 HedgingRunnable(FutureCanceller scheduledHedging) { 447 scheduledHedgingRef = scheduledHedging; 448 } 449 450 @Override run()451 public void run() { 452 // It's safe to read state.hedgingAttemptCount here. 453 // If this run is not cancelled, the value of state.hedgingAttemptCount won't change 454 // until state.addActiveHedge() is called subsequently, even the state could possibly 455 // change. 456 Substream newSubstream = createSubstream(state.hedgingAttemptCount, false); 457 if (newSubstream == null) { 458 return; 459 } 460 callExecutor.execute( 461 new Runnable() { 462 @SuppressWarnings("GuardedBy") 463 @Override 464 public void run() { 465 boolean cancelled = false; 466 FutureCanceller future = null; 467 468 synchronized (lock) { 469 // TODO(b/145386688): This access should be guarded by 470 // 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found: 471 // 'RetriableStream.this.lock' 472 if (scheduledHedgingRef.isCancelled()) { 473 cancelled = true; 474 } else { 475 state = state.addActiveHedge(newSubstream); 476 if (hasPotentialHedging(state) 477 && (throttle == null || throttle.isAboveThreshold())) { 478 scheduledHedging = future = new FutureCanceller(lock); 479 } else { 480 state = state.freezeHedging(); 481 scheduledHedging = null; 482 } 483 } 484 } 485 486 if (cancelled) { 487 // Start stream so inFlightSubStreams is decremented in Sublistener.closed() 488 newSubstream.stream.start(new Sublistener(newSubstream)); 489 newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging")); 490 return; 491 } 492 if (future != null) { 493 future.setFuture( 494 scheduledExecutorService.schedule( 495 new HedgingRunnable(future), 496 hedgingPolicy.hedgingDelayNanos, 497 TimeUnit.NANOSECONDS)); 498 } 499 drain(newSubstream); 500 } 501 }); 502 } 503 } 504 505 @Override cancel(final Status reason)506 public final void cancel(final Status reason) { 507 Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); 508 noopSubstream.stream = new NoopClientStream(); 509 Runnable runnable = commit(noopSubstream); 510 511 if (runnable != null) { 512 synchronized (lock) { 513 state = state.substreamDrained(noopSubstream); 514 } 515 runnable.run(); 516 safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata()); 517 return; 518 } 519 520 Substream winningSubstreamToCancel = null; 521 synchronized (lock) { 522 if (state.drainedSubstreams.contains(state.winningSubstream)) { 523 winningSubstreamToCancel = state.winningSubstream; 524 } else { // the winningSubstream will be cancelled while draining 525 cancellationStatus = reason; 526 } 527 state = state.cancelled(); 528 } 529 if (winningSubstreamToCancel != null) { 530 winningSubstreamToCancel.stream.cancel(reason); 531 } 532 } 533 delayOrExecute(BufferEntry bufferEntry)534 private void delayOrExecute(BufferEntry bufferEntry) { 535 Collection<Substream> savedDrainedSubstreams; 536 synchronized (lock) { 537 if (!state.passThrough) { 538 state.buffer.add(bufferEntry); 539 } 540 savedDrainedSubstreams = state.drainedSubstreams; 541 } 542 543 for (Substream substream : savedDrainedSubstreams) { 544 bufferEntry.runWith(substream); 545 } 546 } 547 548 /** 549 * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use 550 * InputStream for buffering. 551 */ 552 @Override writeMessage(InputStream message)553 public final void writeMessage(InputStream message) { 554 throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly"); 555 } 556 sendMessage(final ReqT message)557 final void sendMessage(final ReqT message) { 558 State savedState = state; 559 if (savedState.passThrough) { 560 savedState.winningSubstream.stream.writeMessage(method.streamRequest(message)); 561 return; 562 } 563 564 class SendMessageEntry implements BufferEntry { 565 @Override 566 public void runWith(Substream substream) { 567 substream.stream.writeMessage(method.streamRequest(message)); 568 // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by 569 // flushes (or half close), but retry appears to have a code path that the flushes may 570 // not happen. The code needs to be fixed and this removed. See #9340. 571 substream.stream.flush(); 572 } 573 } 574 575 delayOrExecute(new SendMessageEntry()); 576 } 577 578 @Override request(final int numMessages)579 public final void request(final int numMessages) { 580 State savedState = state; 581 if (savedState.passThrough) { 582 savedState.winningSubstream.stream.request(numMessages); 583 return; 584 } 585 586 class RequestEntry implements BufferEntry { 587 @Override 588 public void runWith(Substream substream) { 589 substream.stream.request(numMessages); 590 } 591 } 592 593 delayOrExecute(new RequestEntry()); 594 } 595 596 @Override flush()597 public final void flush() { 598 State savedState = state; 599 if (savedState.passThrough) { 600 savedState.winningSubstream.stream.flush(); 601 return; 602 } 603 604 class FlushEntry implements BufferEntry { 605 @Override 606 public void runWith(Substream substream) { 607 substream.stream.flush(); 608 } 609 } 610 611 delayOrExecute(new FlushEntry()); 612 } 613 614 @Override isReady()615 public final boolean isReady() { 616 for (Substream substream : state.drainedSubstreams) { 617 if (substream.stream.isReady()) { 618 return true; 619 } 620 } 621 return false; 622 } 623 624 @Override optimizeForDirectExecutor()625 public void optimizeForDirectExecutor() { 626 class OptimizeDirectEntry implements BufferEntry { 627 @Override 628 public void runWith(Substream substream) { 629 substream.stream.optimizeForDirectExecutor(); 630 } 631 } 632 633 delayOrExecute(new OptimizeDirectEntry()); 634 } 635 636 @Override setCompressor(final Compressor compressor)637 public final void setCompressor(final Compressor compressor) { 638 class CompressorEntry implements BufferEntry { 639 @Override 640 public void runWith(Substream substream) { 641 substream.stream.setCompressor(compressor); 642 } 643 } 644 645 delayOrExecute(new CompressorEntry()); 646 } 647 648 @Override setFullStreamDecompression(final boolean fullStreamDecompression)649 public final void setFullStreamDecompression(final boolean fullStreamDecompression) { 650 class FullStreamDecompressionEntry implements BufferEntry { 651 @Override 652 public void runWith(Substream substream) { 653 substream.stream.setFullStreamDecompression(fullStreamDecompression); 654 } 655 } 656 657 delayOrExecute(new FullStreamDecompressionEntry()); 658 } 659 660 @Override setMessageCompression(final boolean enable)661 public final void setMessageCompression(final boolean enable) { 662 class MessageCompressionEntry implements BufferEntry { 663 @Override 664 public void runWith(Substream substream) { 665 substream.stream.setMessageCompression(enable); 666 } 667 } 668 669 delayOrExecute(new MessageCompressionEntry()); 670 } 671 672 @Override halfClose()673 public final void halfClose() { 674 class HalfCloseEntry implements BufferEntry { 675 @Override 676 public void runWith(Substream substream) { 677 substream.stream.halfClose(); 678 } 679 } 680 681 delayOrExecute(new HalfCloseEntry()); 682 } 683 684 @Override setAuthority(final String authority)685 public final void setAuthority(final String authority) { 686 class AuthorityEntry implements BufferEntry { 687 @Override 688 public void runWith(Substream substream) { 689 substream.stream.setAuthority(authority); 690 } 691 } 692 693 delayOrExecute(new AuthorityEntry()); 694 } 695 696 @Override setDecompressorRegistry(final DecompressorRegistry decompressorRegistry)697 public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) { 698 class DecompressorRegistryEntry implements BufferEntry { 699 @Override 700 public void runWith(Substream substream) { 701 substream.stream.setDecompressorRegistry(decompressorRegistry); 702 } 703 } 704 705 delayOrExecute(new DecompressorRegistryEntry()); 706 } 707 708 @Override setMaxInboundMessageSize(final int maxSize)709 public final void setMaxInboundMessageSize(final int maxSize) { 710 class MaxInboundMessageSizeEntry implements BufferEntry { 711 @Override 712 public void runWith(Substream substream) { 713 substream.stream.setMaxInboundMessageSize(maxSize); 714 } 715 } 716 717 delayOrExecute(new MaxInboundMessageSizeEntry()); 718 } 719 720 @Override setMaxOutboundMessageSize(final int maxSize)721 public final void setMaxOutboundMessageSize(final int maxSize) { 722 class MaxOutboundMessageSizeEntry implements BufferEntry { 723 @Override 724 public void runWith(Substream substream) { 725 substream.stream.setMaxOutboundMessageSize(maxSize); 726 } 727 } 728 729 delayOrExecute(new MaxOutboundMessageSizeEntry()); 730 } 731 732 @Override setDeadline(final Deadline deadline)733 public final void setDeadline(final Deadline deadline) { 734 class DeadlineEntry implements BufferEntry { 735 @Override 736 public void runWith(Substream substream) { 737 substream.stream.setDeadline(deadline); 738 } 739 } 740 741 delayOrExecute(new DeadlineEntry()); 742 } 743 744 @Override getAttributes()745 public final Attributes getAttributes() { 746 if (state.winningSubstream != null) { 747 return state.winningSubstream.stream.getAttributes(); 748 } 749 return Attributes.EMPTY; 750 } 751 752 @Override appendTimeoutInsight(InsightBuilder insight)753 public void appendTimeoutInsight(InsightBuilder insight) { 754 State currentState; 755 synchronized (lock) { 756 insight.appendKeyValue("closed", closedSubstreamsInsight); 757 currentState = state; 758 } 759 if (currentState.winningSubstream != null) { 760 // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor 761 // of the winning substream, they may not have received closed() notifications yet, thus they 762 // may be missing from closedSubstreamsInsight. This may be a little confusing to the user. 763 // We need to figure out how to include them. 764 InsightBuilder substreamInsight = new InsightBuilder(); 765 currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight); 766 insight.appendKeyValue("committed", substreamInsight); 767 } else { 768 InsightBuilder openSubstreamsInsight = new InsightBuilder(); 769 // drainedSubstreams doesn't include all open substreams. Those which have just been created 770 // and are still catching up with buffered requests (in other words, still draining) will not 771 // show up. We think this is benign, because the draining should be typically fast, and it'd 772 // be indistinguishable from the case where those streams are to be created a little late due 773 // to delays in the timer. 774 for (Substream sub : currentState.drainedSubstreams) { 775 InsightBuilder substreamInsight = new InsightBuilder(); 776 sub.stream.appendTimeoutInsight(substreamInsight); 777 openSubstreamsInsight.append(substreamInsight); 778 } 779 insight.appendKeyValue("open", openSubstreamsInsight); 780 } 781 } 782 783 private static Random random = new Random(); 784 785 @VisibleForTesting setRandom(Random random)786 static void setRandom(Random random) { 787 RetriableStream.random = random; 788 } 789 790 /** 791 * Whether there is any potential hedge at the moment. A false return value implies there is 792 * absolutely no potential hedge. At least one of the hedges will observe a false return value 793 * when calling this method, unless otherwise the rpc is committed. 794 */ 795 // only called when isHedging is true 796 @GuardedBy("lock") hasPotentialHedging(State state)797 private boolean hasPotentialHedging(State state) { 798 return state.winningSubstream == null 799 && state.hedgingAttemptCount < hedgingPolicy.maxAttempts 800 && !state.hedgingFrozen; 801 } 802 803 @SuppressWarnings("GuardedBy") freezeHedging()804 private void freezeHedging() { 805 Future<?> futureToBeCancelled = null; 806 synchronized (lock) { 807 if (scheduledHedging != null) { 808 // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead 809 // found: 'this.lock' 810 futureToBeCancelled = scheduledHedging.markCancelled(); 811 scheduledHedging = null; 812 } 813 state = state.freezeHedging(); 814 } 815 816 if (futureToBeCancelled != null) { 817 futureToBeCancelled.cancel(false); 818 } 819 } 820 safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata)821 private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) { 822 savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress, 823 metadata); 824 if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) { 825 listenerSerializeExecutor.execute( 826 new Runnable() { 827 @Override 828 public void run() { 829 isClosed = true; 830 masterListener.closed(status, progress, metadata); 831 } 832 }); 833 } 834 } 835 836 private static final class SavedCloseMasterListenerReason { 837 private final Status status; 838 private final RpcProgress progress; 839 private final Metadata metadata; 840 SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata)841 SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) { 842 this.status = status; 843 this.progress = progress; 844 this.metadata = metadata; 845 } 846 } 847 848 private interface BufferEntry { 849 /** Replays the buffer entry with the given stream. */ runWith(Substream substream)850 void runWith(Substream substream); 851 } 852 853 private final class Sublistener implements ClientStreamListener { 854 final Substream substream; 855 Sublistener(Substream substream)856 Sublistener(Substream substream) { 857 this.substream = substream; 858 } 859 860 @Override headersRead(final Metadata headers)861 public void headersRead(final Metadata headers) { 862 if (substream.previousAttemptCount > 0) { 863 headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS); 864 headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount)); 865 } 866 commitAndRun(substream); 867 if (state.winningSubstream == substream) { 868 if (throttle != null) { 869 throttle.onSuccess(); 870 } 871 listenerSerializeExecutor.execute( 872 new Runnable() { 873 @Override 874 public void run() { 875 masterListener.headersRead(headers); 876 } 877 }); 878 } 879 } 880 881 @Override closed( final Status status, final RpcProgress rpcProgress, final Metadata trailers)882 public void closed( 883 final Status status, final RpcProgress rpcProgress, final Metadata trailers) { 884 synchronized (lock) { 885 state = state.substreamClosed(substream); 886 closedSubstreamsInsight.append(status.getCode()); 887 } 888 889 if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) { 890 assert savedCloseMasterListenerReason != null; 891 listenerSerializeExecutor.execute( 892 new Runnable() { 893 @Override 894 public void run() { 895 isClosed = true; 896 masterListener.closed(savedCloseMasterListenerReason.status, 897 savedCloseMasterListenerReason.progress, 898 savedCloseMasterListenerReason.metadata); 899 } 900 }); 901 return; 902 } 903 904 // handle a race between buffer limit exceeded and closed, when setting 905 // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream). 906 if (substream.bufferLimitExceeded) { 907 commitAndRun(substream); 908 if (state.winningSubstream == substream) { 909 safeCloseMasterListener(status, rpcProgress, trailers); 910 } 911 return; 912 } 913 if (rpcProgress == RpcProgress.MISCARRIED 914 && localOnlyTransparentRetries.incrementAndGet() > 1_000) { 915 commitAndRun(substream); 916 if (state.winningSubstream == substream) { 917 Status tooManyTransparentRetries = Status.INTERNAL 918 .withDescription("Too many transparent retries. Might be a bug in gRPC") 919 .withCause(status.asRuntimeException()); 920 safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers); 921 } 922 return; 923 } 924 925 if (state.winningSubstream == null) { 926 if (rpcProgress == RpcProgress.MISCARRIED 927 || (rpcProgress == RpcProgress.REFUSED 928 && noMoreTransparentRetry.compareAndSet(false, true))) { 929 // transparent retry 930 final Substream newSubstream = createSubstream(substream.previousAttemptCount, true); 931 if (newSubstream == null) { 932 return; 933 } 934 if (isHedging) { 935 synchronized (lock) { 936 // Although this operation is not done atomically with 937 // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of 938 // activeHedges, so neither does it affect the commitment decision of other threads, 939 // nor do the commitment decision making threads affect itself. 940 state = state.replaceActiveHedge(substream, newSubstream); 941 } 942 } 943 944 callExecutor.execute(new Runnable() { 945 @Override 946 public void run() { 947 drain(newSubstream); 948 } 949 }); 950 return; 951 } else if (rpcProgress == RpcProgress.DROPPED) { 952 // For normal retry, nothing need be done here, will just commit. 953 // For hedging, cancel scheduled hedge that is scheduled prior to the drop 954 if (isHedging) { 955 freezeHedging(); 956 } 957 } else { 958 noMoreTransparentRetry.set(true); 959 960 if (isHedging) { 961 HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers); 962 if (hedgingPlan.isHedgeable) { 963 pushbackHedging(hedgingPlan.hedgingPushbackMillis); 964 } 965 synchronized (lock) { 966 state = state.removeActiveHedge(substream); 967 // The invariant is whether or not #(Potential Hedge + active hedges) > 0. 968 // Once hasPotentialHedging(state) is false, it will always be false, and then 969 // #(state.activeHedges) will be decreasing. This guarantees that even there may be 970 // multiple concurrent hedges, one of the hedges will end up committed. 971 if (hedgingPlan.isHedgeable) { 972 if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) { 973 return; 974 } 975 // else, no activeHedges, no new hedges possible, try to commit 976 } // else, isHedgeable is false, try to commit 977 } 978 } else { 979 RetryPlan retryPlan = makeRetryDecision(status, trailers); 980 if (retryPlan.shouldRetry) { 981 // retry 982 Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false); 983 if (newSubstream == null) { 984 return; 985 } 986 // The check state.winningSubstream == null, checking if is not already committed, is 987 // racy, but is still safe b/c the retry will also handle committed/cancellation 988 FutureCanceller scheduledRetryCopy; 989 synchronized (lock) { 990 scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); 991 } 992 class RetryBackoffRunnable implements Runnable { 993 @Override 994 public void run() { 995 callExecutor.execute( 996 new Runnable() { 997 @Override 998 public void run() { 999 drain(newSubstream); 1000 } 1001 }); 1002 } 1003 } 1004 1005 scheduledRetryCopy.setFuture( 1006 scheduledExecutorService.schedule( 1007 new RetryBackoffRunnable(), 1008 retryPlan.backoffNanos, 1009 TimeUnit.NANOSECONDS)); 1010 return; 1011 } 1012 } 1013 } 1014 } 1015 1016 commitAndRun(substream); 1017 if (state.winningSubstream == substream) { 1018 safeCloseMasterListener(status, rpcProgress, trailers); 1019 } 1020 } 1021 1022 /** 1023 * Decides in current situation whether or not the RPC should retry and if it should retry how 1024 * long the backoff should be. The decision does not take the commitment status into account, so 1025 * caller should check it separately. It also updates the throttle. It does not change state. 1026 */ makeRetryDecision(Status status, Metadata trailer)1027 private RetryPlan makeRetryDecision(Status status, Metadata trailer) { 1028 if (retryPolicy == null) { 1029 return new RetryPlan(false, 0); 1030 } 1031 boolean shouldRetry = false; 1032 long backoffNanos = 0L; 1033 boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode()); 1034 Integer pushbackMillis = getPushbackMills(trailer); 1035 boolean isThrottled = false; 1036 if (throttle != null) { 1037 if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) { 1038 isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); 1039 } 1040 } 1041 1042 if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) { 1043 if (pushbackMillis == null) { 1044 if (isRetryableStatusCode) { 1045 shouldRetry = true; 1046 backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble()); 1047 nextBackoffIntervalNanos = Math.min( 1048 (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier), 1049 retryPolicy.maxBackoffNanos); 1050 } // else no retry 1051 } else if (pushbackMillis >= 0) { 1052 shouldRetry = true; 1053 backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis); 1054 nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos; 1055 } // else no retry 1056 } // else no retry 1057 1058 return new RetryPlan(shouldRetry, backoffNanos); 1059 } 1060 makeHedgingDecision(Status status, Metadata trailer)1061 private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) { 1062 Integer pushbackMillis = getPushbackMills(trailer); 1063 boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode()); 1064 boolean isThrottled = false; 1065 if (throttle != null) { 1066 if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) { 1067 isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); 1068 } 1069 } 1070 return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis); 1071 } 1072 1073 @Nullable getPushbackMills(Metadata trailer)1074 private Integer getPushbackMills(Metadata trailer) { 1075 String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS); 1076 Integer pushbackMillis = null; 1077 if (pushbackStr != null) { 1078 try { 1079 pushbackMillis = Integer.valueOf(pushbackStr); 1080 } catch (NumberFormatException e) { 1081 pushbackMillis = -1; 1082 } 1083 } 1084 return pushbackMillis; 1085 } 1086 1087 @Override messagesAvailable(final MessageProducer producer)1088 public void messagesAvailable(final MessageProducer producer) { 1089 State savedState = state; 1090 checkState( 1091 savedState.winningSubstream != null, "Headers should be received prior to messages."); 1092 if (savedState.winningSubstream != substream) { 1093 GrpcUtil.closeQuietly(producer); 1094 return; 1095 } 1096 listenerSerializeExecutor.execute( 1097 new Runnable() { 1098 @Override 1099 public void run() { 1100 masterListener.messagesAvailable(producer); 1101 } 1102 }); 1103 } 1104 1105 @Override onReady()1106 public void onReady() { 1107 // FIXME(#7089): hedging case is broken. 1108 if (!isReady()) { 1109 return; 1110 } 1111 listenerSerializeExecutor.execute( 1112 new Runnable() { 1113 @Override 1114 public void run() { 1115 if (!isClosed) { 1116 masterListener.onReady(); 1117 } 1118 } 1119 }); 1120 } 1121 } 1122 1123 private static final class State { 1124 /** Committed and the winning substream drained. */ 1125 final boolean passThrough; 1126 1127 /** A list of buffered ClientStream runnables. Set to Null once passThrough. */ 1128 @Nullable final List<BufferEntry> buffer; 1129 1130 /** 1131 * Unmodifiable collection of all the open substreams that are drained. Singleton once 1132 * passThrough; Empty if committed but not passTrough. 1133 */ 1134 final Collection<Substream> drainedSubstreams; 1135 1136 /** 1137 * Unmodifiable collection of all the active hedging substreams. 1138 * 1139 * <p>A substream even with the attribute substream.closed being true may be considered still 1140 * "active" at the moment as long as it is in this collection. 1141 */ 1142 final Collection<Substream> activeHedges; // not null once isHedging = true 1143 1144 final int hedgingAttemptCount; 1145 1146 /** Null until committed. */ 1147 @Nullable final Substream winningSubstream; 1148 1149 /** Not required to set to true when cancelled, but can short-circuit the draining process. */ 1150 final boolean cancelled; 1151 1152 /** No more hedging due to events like drop or pushback. */ 1153 final boolean hedgingFrozen; 1154 State( @ullable List<BufferEntry> buffer, Collection<Substream> drainedSubstreams, Collection<Substream> activeHedges, @Nullable Substream winningSubstream, boolean cancelled, boolean passThrough, boolean hedgingFrozen, int hedgingAttemptCount)1155 State( 1156 @Nullable List<BufferEntry> buffer, 1157 Collection<Substream> drainedSubstreams, 1158 Collection<Substream> activeHedges, 1159 @Nullable Substream winningSubstream, 1160 boolean cancelled, 1161 boolean passThrough, 1162 boolean hedgingFrozen, 1163 int hedgingAttemptCount) { 1164 this.buffer = buffer; 1165 this.drainedSubstreams = 1166 checkNotNull(drainedSubstreams, "drainedSubstreams"); 1167 this.winningSubstream = winningSubstream; 1168 this.activeHedges = activeHedges; 1169 this.cancelled = cancelled; 1170 this.passThrough = passThrough; 1171 this.hedgingFrozen = hedgingFrozen; 1172 this.hedgingAttemptCount = hedgingAttemptCount; 1173 1174 checkState(!passThrough || buffer == null, "passThrough should imply buffer is null"); 1175 checkState( 1176 !passThrough || winningSubstream != null, 1177 "passThrough should imply winningSubstream != null"); 1178 checkState( 1179 !passThrough 1180 || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream)) 1181 || (drainedSubstreams.size() == 0 && winningSubstream.closed), 1182 "passThrough should imply winningSubstream is drained"); 1183 checkState(!cancelled || winningSubstream != null, "cancelled should imply committed"); 1184 } 1185 1186 @CheckReturnValue 1187 // GuardedBy RetriableStream.lock cancelled()1188 State cancelled() { 1189 return new State( 1190 buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough, 1191 hedgingFrozen, hedgingAttemptCount); 1192 } 1193 1194 /** The given substream is drained. */ 1195 @CheckReturnValue 1196 // GuardedBy RetriableStream.lock substreamDrained(Substream substream)1197 State substreamDrained(Substream substream) { 1198 checkState(!passThrough, "Already passThrough"); 1199 1200 Collection<Substream> drainedSubstreams; 1201 1202 if (substream.closed) { 1203 drainedSubstreams = this.drainedSubstreams; 1204 } else if (this.drainedSubstreams.isEmpty()) { 1205 // optimize for 0-retry, which is most of the cases. 1206 drainedSubstreams = Collections.singletonList(substream); 1207 } else { 1208 drainedSubstreams = new ArrayList<>(this.drainedSubstreams); 1209 drainedSubstreams.add(substream); 1210 drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams); 1211 } 1212 1213 boolean passThrough = winningSubstream != null; 1214 1215 List<BufferEntry> buffer = this.buffer; 1216 if (passThrough) { 1217 checkState( 1218 winningSubstream == substream, "Another RPC attempt has already committed"); 1219 buffer = null; 1220 } 1221 1222 return new State( 1223 buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, 1224 hedgingFrozen, hedgingAttemptCount); 1225 } 1226 1227 /** The given substream is closed. */ 1228 @CheckReturnValue 1229 // GuardedBy RetriableStream.lock substreamClosed(Substream substream)1230 State substreamClosed(Substream substream) { 1231 substream.closed = true; 1232 if (this.drainedSubstreams.contains(substream)) { 1233 Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams); 1234 drainedSubstreams.remove(substream); 1235 drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams); 1236 return new State( 1237 buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, 1238 hedgingFrozen, hedgingAttemptCount); 1239 } else { 1240 return this; 1241 } 1242 } 1243 1244 @CheckReturnValue 1245 // GuardedBy RetriableStream.lock committed(Substream winningSubstream)1246 State committed(Substream winningSubstream) { 1247 checkState(this.winningSubstream == null, "Already committed"); 1248 1249 boolean passThrough = false; 1250 List<BufferEntry> buffer = this.buffer; 1251 Collection<Substream> drainedSubstreams; 1252 1253 if (this.drainedSubstreams.contains(winningSubstream)) { 1254 passThrough = true; 1255 buffer = null; 1256 drainedSubstreams = Collections.singleton(winningSubstream); 1257 } else { 1258 drainedSubstreams = Collections.emptyList(); 1259 } 1260 1261 return new State( 1262 buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, 1263 hedgingFrozen, hedgingAttemptCount); 1264 } 1265 1266 @CheckReturnValue 1267 // GuardedBy RetriableStream.lock freezeHedging()1268 State freezeHedging() { 1269 if (hedgingFrozen) { 1270 return this; 1271 } 1272 return new State( 1273 buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, 1274 true, hedgingAttemptCount); 1275 } 1276 1277 @CheckReturnValue 1278 // GuardedBy RetriableStream.lock 1279 // state.hedgingAttemptCount is modified only here. 1280 // The method is only called in RetriableStream.start() and HedgingRunnable.run() addActiveHedge(Substream substream)1281 State addActiveHedge(Substream substream) { 1282 // hasPotentialHedging must be true 1283 checkState(!hedgingFrozen, "hedging frozen"); 1284 checkState(winningSubstream == null, "already committed"); 1285 1286 Collection<Substream> activeHedges; 1287 if (this.activeHedges == null) { 1288 activeHedges = Collections.singleton(substream); 1289 } else { 1290 activeHedges = new ArrayList<>(this.activeHedges); 1291 activeHedges.add(substream); 1292 activeHedges = Collections.unmodifiableCollection(activeHedges); 1293 } 1294 1295 int hedgingAttemptCount = this.hedgingAttemptCount + 1; 1296 return new State( 1297 buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, 1298 hedgingFrozen, hedgingAttemptCount); 1299 } 1300 1301 @CheckReturnValue 1302 // GuardedBy RetriableStream.lock 1303 // The method is only called in Sublistener.closed() removeActiveHedge(Substream substream)1304 State removeActiveHedge(Substream substream) { 1305 Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges); 1306 activeHedges.remove(substream); 1307 activeHedges = Collections.unmodifiableCollection(activeHedges); 1308 1309 return new State( 1310 buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, 1311 hedgingFrozen, hedgingAttemptCount); 1312 } 1313 1314 @CheckReturnValue 1315 // GuardedBy RetriableStream.lock 1316 // The method is only called for transparent retry. replaceActiveHedge(Substream oldOne, Substream newOne)1317 State replaceActiveHedge(Substream oldOne, Substream newOne) { 1318 Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges); 1319 activeHedges.remove(oldOne); 1320 activeHedges.add(newOne); 1321 activeHedges = Collections.unmodifiableCollection(activeHedges); 1322 1323 return new State( 1324 buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough, 1325 hedgingFrozen, hedgingAttemptCount); 1326 } 1327 } 1328 1329 /** 1330 * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful 1331 * attributes. 1332 */ 1333 private static final class Substream { 1334 ClientStream stream; 1335 1336 // GuardedBy RetriableStream.lock 1337 boolean closed; 1338 1339 // setting to true must be GuardedBy RetriableStream.lock 1340 boolean bufferLimitExceeded; 1341 1342 final int previousAttemptCount; 1343 Substream(int previousAttemptCount)1344 Substream(int previousAttemptCount) { 1345 this.previousAttemptCount = previousAttemptCount; 1346 } 1347 } 1348 1349 1350 /** 1351 * Traces the buffer used by a substream. 1352 */ 1353 class BufferSizeTracer extends ClientStreamTracer { 1354 // Each buffer size tracer is dedicated to one specific substream. 1355 private final Substream substream; 1356 1357 @GuardedBy("lock") 1358 long bufferNeeded; 1359 BufferSizeTracer(Substream substream)1360 BufferSizeTracer(Substream substream) { 1361 this.substream = substream; 1362 } 1363 1364 /** 1365 * A message is sent to the wire, so its reference would be released if no retry or 1366 * hedging were involved. So at this point we have to hold the reference of the message longer 1367 * for retry, and we need to increment {@code substream.bufferNeeded}. 1368 */ 1369 @Override outboundWireSize(long bytes)1370 public void outboundWireSize(long bytes) { 1371 if (state.winningSubstream != null) { 1372 return; 1373 } 1374 1375 Runnable postCommitTask = null; 1376 1377 // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound. 1378 synchronized (lock) { 1379 if (state.winningSubstream != null || substream.closed) { 1380 return; 1381 } 1382 bufferNeeded += bytes; 1383 if (bufferNeeded <= perRpcBufferUsed) { 1384 return; 1385 } 1386 1387 if (bufferNeeded > perRpcBufferLimit) { 1388 substream.bufferLimitExceeded = true; 1389 } else { 1390 // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit. 1391 long savedChannelBufferUsed = 1392 channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed); 1393 perRpcBufferUsed = bufferNeeded; 1394 1395 if (savedChannelBufferUsed > channelBufferLimit) { 1396 substream.bufferLimitExceeded = true; 1397 } 1398 } 1399 1400 if (substream.bufferLimitExceeded) { 1401 postCommitTask = commit(substream); 1402 } 1403 } 1404 1405 if (postCommitTask != null) { 1406 postCommitTask.run(); 1407 } 1408 } 1409 } 1410 1411 /** 1412 * Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for 1413 * the Channel. There should be a single instance of it for each channel. 1414 */ 1415 static final class ChannelBufferMeter { 1416 private final AtomicLong bufferUsed = new AtomicLong(); 1417 1418 @VisibleForTesting addAndGet(long newBytesUsed)1419 long addAndGet(long newBytesUsed) { 1420 return bufferUsed.addAndGet(newBytesUsed); 1421 } 1422 } 1423 1424 /** 1425 * Used for retry throttling. 1426 */ 1427 static final class Throttle { 1428 1429 private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000; 1430 1431 /** 1432 * 1000 times the maxTokens field of the retryThrottling policy in service config. 1433 * The number of tokens starts at maxTokens. The token_count will always be between 0 and 1434 * maxTokens. 1435 */ 1436 final int maxTokens; 1437 1438 /** 1439 * Half of {@code maxTokens}. 1440 */ 1441 final int threshold; 1442 1443 /** 1444 * 1000 times the tokenRatio field of the retryThrottling policy in service config. 1445 */ 1446 final int tokenRatio; 1447 1448 final AtomicInteger tokenCount = new AtomicInteger(); 1449 Throttle(float maxTokens, float tokenRatio)1450 Throttle(float maxTokens, float tokenRatio) { 1451 // tokenRatio is up to 3 decimal places 1452 this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP); 1453 this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP); 1454 this.threshold = this.maxTokens / 2; 1455 tokenCount.set(this.maxTokens); 1456 } 1457 1458 @VisibleForTesting isAboveThreshold()1459 boolean isAboveThreshold() { 1460 return tokenCount.get() > threshold; 1461 } 1462 1463 /** 1464 * Counts down the token on qualified failure and checks if it is above the threshold 1465 * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with 1466 * a not-to-retry pushback. 1467 */ 1468 @VisibleForTesting onQualifiedFailureThenCheckIsAboveThreshold()1469 boolean onQualifiedFailureThenCheckIsAboveThreshold() { 1470 while (true) { 1471 int currentCount = tokenCount.get(); 1472 if (currentCount == 0) { 1473 return false; 1474 } 1475 int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP); 1476 boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0)); 1477 if (updated) { 1478 return decremented > threshold; 1479 } 1480 } 1481 } 1482 1483 @VisibleForTesting onSuccess()1484 void onSuccess() { 1485 while (true) { 1486 int currentCount = tokenCount.get(); 1487 if (currentCount == maxTokens) { 1488 break; 1489 } 1490 int incremented = currentCount + tokenRatio; 1491 boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens)); 1492 if (updated) { 1493 break; 1494 } 1495 } 1496 } 1497 1498 @Override equals(Object o)1499 public boolean equals(Object o) { 1500 if (this == o) { 1501 return true; 1502 } 1503 if (!(o instanceof Throttle)) { 1504 return false; 1505 } 1506 Throttle that = (Throttle) o; 1507 return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio; 1508 } 1509 1510 @Override hashCode()1511 public int hashCode() { 1512 return Objects.hashCode(maxTokens, tokenRatio); 1513 } 1514 } 1515 1516 private static final class RetryPlan { 1517 final boolean shouldRetry; 1518 final long backoffNanos; 1519 RetryPlan(boolean shouldRetry, long backoffNanos)1520 RetryPlan(boolean shouldRetry, long backoffNanos) { 1521 this.shouldRetry = shouldRetry; 1522 this.backoffNanos = backoffNanos; 1523 } 1524 } 1525 1526 private static final class HedgingPlan { 1527 final boolean isHedgeable; 1528 @Nullable 1529 final Integer hedgingPushbackMillis; 1530 HedgingPlan( boolean isHedgeable, @Nullable Integer hedgingPushbackMillis)1531 public HedgingPlan( 1532 boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) { 1533 this.isHedgeable = isHedgeable; 1534 this.hedgingPushbackMillis = hedgingPushbackMillis; 1535 } 1536 } 1537 1538 /** Allows cancelling a Future without racing with setting the future. */ 1539 private static final class FutureCanceller { 1540 1541 final Object lock; 1542 @GuardedBy("lock") 1543 Future<?> future; 1544 @GuardedBy("lock") 1545 boolean cancelled; 1546 FutureCanceller(Object lock)1547 FutureCanceller(Object lock) { 1548 this.lock = lock; 1549 } 1550 setFuture(Future<?> future)1551 void setFuture(Future<?> future) { 1552 synchronized (lock) { 1553 if (!cancelled) { 1554 this.future = future; 1555 } 1556 } 1557 } 1558 1559 @GuardedBy("lock") 1560 @CheckForNull // Must cancel the returned future if not null. markCancelled()1561 Future<?> markCancelled() { 1562 cancelled = true; 1563 return future; 1564 } 1565 1566 @GuardedBy("lock") isCancelled()1567 boolean isCancelled() { 1568 return cancelled; 1569 } 1570 } 1571 } 1572