• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Objects;
25 import io.grpc.Attributes;
26 import io.grpc.ClientStreamTracer;
27 import io.grpc.Compressor;
28 import io.grpc.Deadline;
29 import io.grpc.DecompressorRegistry;
30 import io.grpc.Metadata;
31 import io.grpc.MethodDescriptor;
32 import io.grpc.Status;
33 import io.grpc.SynchronizationContext;
34 import io.grpc.internal.ClientStreamListener.RpcProgress;
35 import java.io.InputStream;
36 import java.lang.Thread.UncaughtExceptionHandler;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.List;
41 import java.util.Random;
42 import java.util.concurrent.Executor;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.atomic.AtomicLong;
49 import javax.annotation.CheckForNull;
50 import javax.annotation.CheckReturnValue;
51 import javax.annotation.Nullable;
52 import javax.annotation.concurrent.GuardedBy;
53 
54 /** A logical {@link ClientStream} that is retriable. */
55 abstract class RetriableStream<ReqT> implements ClientStream {
56   @VisibleForTesting
57   static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
58       Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
59 
60   @VisibleForTesting
61   static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
62       Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
63 
64   private static final Status CANCELLED_BECAUSE_COMMITTED =
65       Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
66 
67   private final MethodDescriptor<ReqT, ?> method;
68   private final Executor callExecutor;
69   private final Executor listenerSerializeExecutor = new SynchronizationContext(
70       new UncaughtExceptionHandler() {
71         @Override
72         public void uncaughtException(Thread t, Throwable e) {
73           throw Status.fromThrowable(e)
74               .withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
75               .asRuntimeException();
76         }
77       }
78   );
79   private final ScheduledExecutorService scheduledExecutorService;
80   // Must not modify it.
81   private final Metadata headers;
82   @Nullable
83   private final RetryPolicy retryPolicy;
84   @Nullable
85   private final HedgingPolicy hedgingPolicy;
86   private final boolean isHedging;
87 
88   /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
89   private final Object lock = new Object();
90 
91   private final ChannelBufferMeter channelBufferUsed;
92   private final long perRpcBufferLimit;
93   private final long channelBufferLimit;
94   @Nullable
95   private final Throttle throttle;
96   @GuardedBy("lock")
97   private final InsightBuilder closedSubstreamsInsight = new InsightBuilder();
98 
99   private volatile State state = new State(
100       new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, null, false, false,
101       false, 0);
102 
103   /**
104    * Either non-local transparent retry happened or reached server's application logic.
105    *
106    * <p>Note that local-only transparent retries are unlimited.
107    */
108   private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
109   private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
110   private final AtomicInteger inFlightSubStreams = new AtomicInteger();
111   private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112 
113   // Used for recording the share of buffer used for the current call out of the channel buffer.
114   // This field would not be necessary if there is no channel buffer limit.
115   @GuardedBy("lock")
116   private long perRpcBufferUsed;
117 
118   private ClientStreamListener masterListener;
119   @GuardedBy("lock")
120   private FutureCanceller scheduledRetry;
121   @GuardedBy("lock")
122   private FutureCanceller scheduledHedging;
123   private long nextBackoffIntervalNanos;
124   private Status cancellationStatus;
125   private boolean isClosed;
126 
RetriableStream( MethodDescriptor<ReqT, ?> method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, Executor callExecutor, ScheduledExecutorService scheduledExecutorService, @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy, @Nullable Throttle throttle)127   RetriableStream(
128       MethodDescriptor<ReqT, ?> method, Metadata headers,
129       ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
130       Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
131       @Nullable RetryPolicy retryPolicy, @Nullable HedgingPolicy hedgingPolicy,
132       @Nullable Throttle throttle) {
133     this.method = method;
134     this.channelBufferUsed = channelBufferUsed;
135     this.perRpcBufferLimit = perRpcBufferLimit;
136     this.channelBufferLimit = channelBufferLimit;
137     this.callExecutor = callExecutor;
138     this.scheduledExecutorService = scheduledExecutorService;
139     this.headers = headers;
140     this.retryPolicy = retryPolicy;
141     if (retryPolicy != null) {
142       this.nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
143     }
144     this.hedgingPolicy = hedgingPolicy;
145     checkArgument(
146         retryPolicy == null || hedgingPolicy == null,
147         "Should not provide both retryPolicy and hedgingPolicy");
148     this.isHedging = hedgingPolicy != null;
149     this.throttle = throttle;
150   }
151 
152   @SuppressWarnings("GuardedBy")
153   @Nullable // null if already committed
154   @CheckReturnValue
commit(final Substream winningSubstream)155   private Runnable commit(final Substream winningSubstream) {
156 
157     synchronized (lock) {
158       if (state.winningSubstream != null) {
159         return null;
160       }
161       final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
162 
163       state = state.committed(winningSubstream);
164 
165       // subtract the share of this RPC from channelBufferUsed.
166       channelBufferUsed.addAndGet(-perRpcBufferUsed);
167 
168       final Future<?> retryFuture;
169       if (scheduledRetry != null) {
170         // TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead
171         // found: 'this.lock'
172         retryFuture = scheduledRetry.markCancelled();
173         scheduledRetry = null;
174       } else {
175         retryFuture = null;
176       }
177       // cancel the scheduled hedging if it is scheduled prior to the commitment
178       final Future<?> hedgingFuture;
179       if (scheduledHedging != null) {
180         // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
181         // found: 'this.lock'
182         hedgingFuture = scheduledHedging.markCancelled();
183         scheduledHedging = null;
184       } else {
185         hedgingFuture = null;
186       }
187 
188       class CommitTask implements Runnable {
189         @Override
190         public void run() {
191           // For hedging only, not needed for normal retry
192           for (Substream substream : savedDrainedSubstreams) {
193             if (substream != winningSubstream) {
194               substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
195             }
196           }
197           if (retryFuture != null) {
198             retryFuture.cancel(false);
199           }
200           if (hedgingFuture != null) {
201             hedgingFuture.cancel(false);
202           }
203 
204           postCommit();
205         }
206       }
207 
208       return new CommitTask();
209     }
210   }
211 
postCommit()212   abstract void postCommit();
213 
214   /**
215    * Calls commit() and if successful runs the post commit task.
216    */
commitAndRun(Substream winningSubstream)217   private void commitAndRun(Substream winningSubstream) {
218     Runnable postCommitTask = commit(winningSubstream);
219 
220     if (postCommitTask != null) {
221       postCommitTask.run();
222     }
223   }
224 
225   // returns null means we should not create new sub streams, e.g. cancelled or
226   // other close condition is met for retriableStream.
227   @Nullable
createSubstream(int previousAttemptCount, boolean isTransparentRetry)228   private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
229     int inFlight;
230     do {
231       inFlight = inFlightSubStreams.get();
232       if (inFlight < 0) {
233         return null;
234       }
235     } while (!inFlightSubStreams.compareAndSet(inFlight, inFlight + 1));
236     Substream sub = new Substream(previousAttemptCount);
237     // one tracer per substream
238     final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
239     ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
240       @Override
241       public ClientStreamTracer newClientStreamTracer(
242           ClientStreamTracer.StreamInfo info, Metadata headers) {
243         return bufferSizeTracer;
244       }
245     };
246 
247     Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
248     // NOTICE: This set _must_ be done before stream.start() and it actually is.
249     sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry);
250     return sub;
251   }
252 
253   /**
254    * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
255    * Client stream is not yet started.
256    */
newSubstream( Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts, boolean isTransparentRetry)257   abstract ClientStream newSubstream(
258       Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
259       boolean isTransparentRetry);
260 
261   /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
262   @VisibleForTesting
updateHeaders( Metadata originalHeaders, int previousAttemptCount)263   final Metadata updateHeaders(
264       Metadata originalHeaders, int previousAttemptCount) {
265     Metadata newHeaders = new Metadata();
266     newHeaders.merge(originalHeaders);
267     if (previousAttemptCount > 0) {
268       newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttemptCount));
269     }
270     return newHeaders;
271   }
272 
drain(Substream substream)273   private void drain(Substream substream) {
274     int index = 0;
275     int chunk = 0x80;
276     List<BufferEntry> list = null;
277     boolean streamStarted = false;
278     Runnable onReadyRunnable = null;
279 
280     while (true) {
281       State savedState;
282 
283       synchronized (lock) {
284         savedState = state;
285         if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
286           // committed but not me, to be cancelled
287           break;
288         }
289         if (savedState.cancelled) {
290           break;
291         }
292         if (index == savedState.buffer.size()) { // I'm drained
293           state = savedState.substreamDrained(substream);
294           if (!isReady()) {
295             return;
296           }
297           onReadyRunnable = new Runnable() {
298             @Override
299             public void run() {
300               if (!isClosed) {
301                 masterListener.onReady();
302               }
303             }
304           };
305           break;
306         }
307 
308         if (substream.closed) {
309           return;
310         }
311 
312         int stop = Math.min(index + chunk, savedState.buffer.size());
313         if (list == null) {
314           list = new ArrayList<>(savedState.buffer.subList(index, stop));
315         } else {
316           list.clear();
317           list.addAll(savedState.buffer.subList(index, stop));
318         }
319         index = stop;
320       }
321 
322       for (BufferEntry bufferEntry : list) {
323         bufferEntry.runWith(substream);
324         if (bufferEntry instanceof RetriableStream.StartEntry) {
325           streamStarted = true;
326         }
327         savedState = state;
328         if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
329           // committed but not me, to be cancelled
330           break;
331         }
332         if (savedState.cancelled) {
333           break;
334         }
335       }
336     }
337 
338     if (onReadyRunnable != null) {
339       listenerSerializeExecutor.execute(onReadyRunnable);
340       return;
341     }
342 
343     if (!streamStarted) {
344       // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
345       substream.stream.start(new Sublistener(substream));
346     }
347     substream.stream.cancel(
348         state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
349   }
350 
351   /**
352    * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
353    */
354   @CheckReturnValue
355   @Nullable
prestart()356   abstract Status prestart();
357 
358   class StartEntry implements BufferEntry {
359     @Override
runWith(Substream substream)360     public void runWith(Substream substream) {
361       substream.stream.start(new Sublistener(substream));
362     }
363   }
364 
365   /** Starts the first PRC attempt. */
366   @Override
start(ClientStreamListener listener)367   public final void start(ClientStreamListener listener) {
368     masterListener = listener;
369 
370     Status shutdownStatus = prestart();
371 
372     if (shutdownStatus != null) {
373       cancel(shutdownStatus);
374       return;
375     }
376 
377     synchronized (lock) {
378       state.buffer.add(new StartEntry());
379     }
380 
381     Substream substream = createSubstream(0, false);
382     if (substream == null) {
383       return;
384     }
385     if (isHedging) {
386       FutureCanceller scheduledHedgingRef = null;
387 
388       synchronized (lock) {
389         state = state.addActiveHedge(substream);
390         if (hasPotentialHedging(state)
391             && (throttle == null || throttle.isAboveThreshold())) {
392           scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
393         }
394       }
395 
396       if (scheduledHedgingRef != null) {
397         scheduledHedgingRef.setFuture(
398             scheduledExecutorService.schedule(
399                 new HedgingRunnable(scheduledHedgingRef),
400                 hedgingPolicy.hedgingDelayNanos,
401                 TimeUnit.NANOSECONDS));
402       }
403     }
404 
405     drain(substream);
406   }
407 
408   @SuppressWarnings("GuardedBy")
pushbackHedging(@ullable Integer delayMillis)409   private void pushbackHedging(@Nullable Integer delayMillis) {
410     if (delayMillis == null) {
411       return;
412     }
413     if (delayMillis < 0) {
414       freezeHedging();
415       return;
416     }
417 
418     // Cancels the current scheduledHedging and reschedules a new one.
419     FutureCanceller future;
420     Future<?> futureToBeCancelled;
421 
422     synchronized (lock) {
423       if (scheduledHedging == null) {
424         return;
425       }
426 
427       // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
428       // found: 'this.lock'
429       futureToBeCancelled = scheduledHedging.markCancelled();
430       scheduledHedging = future = new FutureCanceller(lock);
431     }
432 
433     if (futureToBeCancelled != null) {
434       futureToBeCancelled.cancel(false);
435     }
436     future.setFuture(scheduledExecutorService.schedule(
437         new HedgingRunnable(future), delayMillis, TimeUnit.MILLISECONDS));
438   }
439 
440   private final class HedgingRunnable implements Runnable {
441 
442     // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
443     // by a positive push-back just after newSubstream is instantiated, so that we can double check.
444     final FutureCanceller scheduledHedgingRef;
445 
HedgingRunnable(FutureCanceller scheduledHedging)446     HedgingRunnable(FutureCanceller scheduledHedging) {
447       scheduledHedgingRef = scheduledHedging;
448     }
449 
450     @Override
run()451     public void run() {
452       // It's safe to read state.hedgingAttemptCount here.
453       // If this run is not cancelled, the value of state.hedgingAttemptCount won't change
454       // until state.addActiveHedge() is called subsequently, even the state could possibly
455       // change.
456       Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
457       if (newSubstream == null) {
458         return;
459       }
460       callExecutor.execute(
461           new Runnable() {
462             @SuppressWarnings("GuardedBy")
463             @Override
464             public void run() {
465               boolean cancelled = false;
466               FutureCanceller future = null;
467 
468               synchronized (lock) {
469                 // TODO(b/145386688): This access should be guarded by
470                 // 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:
471                 // 'RetriableStream.this.lock'
472                 if (scheduledHedgingRef.isCancelled()) {
473                   cancelled = true;
474                 } else {
475                   state = state.addActiveHedge(newSubstream);
476                   if (hasPotentialHedging(state)
477                       && (throttle == null || throttle.isAboveThreshold())) {
478                     scheduledHedging = future = new FutureCanceller(lock);
479                   } else {
480                     state = state.freezeHedging();
481                     scheduledHedging = null;
482                   }
483                 }
484               }
485 
486               if (cancelled) {
487                 // Start stream so inFlightSubStreams is decremented in Sublistener.closed()
488                 newSubstream.stream.start(new Sublistener(newSubstream));
489                 newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
490                 return;
491               }
492               if (future != null) {
493                 future.setFuture(
494                     scheduledExecutorService.schedule(
495                         new HedgingRunnable(future),
496                         hedgingPolicy.hedgingDelayNanos,
497                         TimeUnit.NANOSECONDS));
498               }
499               drain(newSubstream);
500             }
501           });
502     }
503   }
504 
505   @Override
cancel(final Status reason)506   public final void cancel(final Status reason) {
507     Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
508     noopSubstream.stream = new NoopClientStream();
509     Runnable runnable = commit(noopSubstream);
510 
511     if (runnable != null) {
512       synchronized (lock) {
513         state = state.substreamDrained(noopSubstream);
514       }
515       runnable.run();
516       safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
517       return;
518     }
519 
520     Substream winningSubstreamToCancel = null;
521     synchronized (lock) {
522       if (state.drainedSubstreams.contains(state.winningSubstream)) {
523         winningSubstreamToCancel = state.winningSubstream;
524       } else { // the winningSubstream will be cancelled while draining
525         cancellationStatus = reason;
526       }
527       state = state.cancelled();
528     }
529     if (winningSubstreamToCancel != null) {
530       winningSubstreamToCancel.stream.cancel(reason);
531     }
532   }
533 
delayOrExecute(BufferEntry bufferEntry)534   private void delayOrExecute(BufferEntry bufferEntry) {
535     Collection<Substream> savedDrainedSubstreams;
536     synchronized (lock) {
537       if (!state.passThrough) {
538         state.buffer.add(bufferEntry);
539       }
540       savedDrainedSubstreams = state.drainedSubstreams;
541     }
542 
543     for (Substream substream : savedDrainedSubstreams) {
544       bufferEntry.runWith(substream);
545     }
546   }
547 
548   /**
549    * Do not use it directly. Use {@link #sendMessage(Object)} instead because we don't use
550    * InputStream for buffering.
551    */
552   @Override
writeMessage(InputStream message)553   public final void writeMessage(InputStream message) {
554     throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
555   }
556 
sendMessage(final ReqT message)557   final void sendMessage(final ReqT message) {
558     State savedState = state;
559     if (savedState.passThrough) {
560       savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
561       return;
562     }
563 
564     class SendMessageEntry implements BufferEntry {
565       @Override
566       public void runWith(Substream substream) {
567         substream.stream.writeMessage(method.streamRequest(message));
568         // TODO(ejona): Workaround Netty memory leak. Message writes always need to be followed by
569         // flushes (or half close), but retry appears to have a code path that the flushes may
570         // not happen. The code needs to be fixed and this removed. See #9340.
571         substream.stream.flush();
572       }
573     }
574 
575     delayOrExecute(new SendMessageEntry());
576   }
577 
578   @Override
request(final int numMessages)579   public final void request(final int numMessages) {
580     State savedState = state;
581     if (savedState.passThrough) {
582       savedState.winningSubstream.stream.request(numMessages);
583       return;
584     }
585 
586     class RequestEntry implements BufferEntry {
587       @Override
588       public void runWith(Substream substream) {
589         substream.stream.request(numMessages);
590       }
591     }
592 
593     delayOrExecute(new RequestEntry());
594   }
595 
596   @Override
flush()597   public final void flush() {
598     State savedState = state;
599     if (savedState.passThrough) {
600       savedState.winningSubstream.stream.flush();
601       return;
602     }
603 
604     class FlushEntry implements BufferEntry {
605       @Override
606       public void runWith(Substream substream) {
607         substream.stream.flush();
608       }
609     }
610 
611     delayOrExecute(new FlushEntry());
612   }
613 
614   @Override
isReady()615   public final boolean isReady() {
616     for (Substream substream : state.drainedSubstreams) {
617       if (substream.stream.isReady()) {
618         return true;
619       }
620     }
621     return false;
622   }
623 
624   @Override
optimizeForDirectExecutor()625   public void optimizeForDirectExecutor() {
626     class OptimizeDirectEntry implements BufferEntry {
627       @Override
628       public void runWith(Substream substream) {
629         substream.stream.optimizeForDirectExecutor();
630       }
631     }
632 
633     delayOrExecute(new OptimizeDirectEntry());
634   }
635 
636   @Override
setCompressor(final Compressor compressor)637   public final void setCompressor(final Compressor compressor) {
638     class CompressorEntry implements BufferEntry {
639       @Override
640       public void runWith(Substream substream) {
641         substream.stream.setCompressor(compressor);
642       }
643     }
644 
645     delayOrExecute(new CompressorEntry());
646   }
647 
648   @Override
setFullStreamDecompression(final boolean fullStreamDecompression)649   public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
650     class FullStreamDecompressionEntry implements BufferEntry {
651       @Override
652       public void runWith(Substream substream) {
653         substream.stream.setFullStreamDecompression(fullStreamDecompression);
654       }
655     }
656 
657     delayOrExecute(new FullStreamDecompressionEntry());
658   }
659 
660   @Override
setMessageCompression(final boolean enable)661   public final void setMessageCompression(final boolean enable) {
662     class MessageCompressionEntry implements BufferEntry {
663       @Override
664       public void runWith(Substream substream) {
665         substream.stream.setMessageCompression(enable);
666       }
667     }
668 
669     delayOrExecute(new MessageCompressionEntry());
670   }
671 
672   @Override
halfClose()673   public final void halfClose() {
674     class HalfCloseEntry implements BufferEntry {
675       @Override
676       public void runWith(Substream substream) {
677         substream.stream.halfClose();
678       }
679     }
680 
681     delayOrExecute(new HalfCloseEntry());
682   }
683 
684   @Override
setAuthority(final String authority)685   public final void setAuthority(final String authority) {
686     class AuthorityEntry implements BufferEntry {
687       @Override
688       public void runWith(Substream substream) {
689         substream.stream.setAuthority(authority);
690       }
691     }
692 
693     delayOrExecute(new AuthorityEntry());
694   }
695 
696   @Override
setDecompressorRegistry(final DecompressorRegistry decompressorRegistry)697   public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
698     class DecompressorRegistryEntry implements BufferEntry {
699       @Override
700       public void runWith(Substream substream) {
701         substream.stream.setDecompressorRegistry(decompressorRegistry);
702       }
703     }
704 
705     delayOrExecute(new DecompressorRegistryEntry());
706   }
707 
708   @Override
setMaxInboundMessageSize(final int maxSize)709   public final void setMaxInboundMessageSize(final int maxSize) {
710     class MaxInboundMessageSizeEntry implements BufferEntry {
711       @Override
712       public void runWith(Substream substream) {
713         substream.stream.setMaxInboundMessageSize(maxSize);
714       }
715     }
716 
717     delayOrExecute(new MaxInboundMessageSizeEntry());
718   }
719 
720   @Override
setMaxOutboundMessageSize(final int maxSize)721   public final void setMaxOutboundMessageSize(final int maxSize) {
722     class MaxOutboundMessageSizeEntry implements BufferEntry {
723       @Override
724       public void runWith(Substream substream) {
725         substream.stream.setMaxOutboundMessageSize(maxSize);
726       }
727     }
728 
729     delayOrExecute(new MaxOutboundMessageSizeEntry());
730   }
731 
732   @Override
setDeadline(final Deadline deadline)733   public final void setDeadline(final Deadline deadline) {
734     class DeadlineEntry implements BufferEntry {
735       @Override
736       public void runWith(Substream substream) {
737         substream.stream.setDeadline(deadline);
738       }
739     }
740 
741     delayOrExecute(new DeadlineEntry());
742   }
743 
744   @Override
getAttributes()745   public final Attributes getAttributes() {
746     if (state.winningSubstream != null) {
747       return state.winningSubstream.stream.getAttributes();
748     }
749     return Attributes.EMPTY;
750   }
751 
752   @Override
appendTimeoutInsight(InsightBuilder insight)753   public void appendTimeoutInsight(InsightBuilder insight) {
754     State currentState;
755     synchronized (lock) {
756       insight.appendKeyValue("closed", closedSubstreamsInsight);
757       currentState = state;
758     }
759     if (currentState.winningSubstream != null) {
760       // TODO(zhangkun83): in this case while other drained substreams have been cancelled in favor
761       // of the winning substream, they may not have received closed() notifications yet, thus they
762       // may be missing from closedSubstreamsInsight.  This may be a little confusing to the user.
763       // We need to figure out how to include them.
764       InsightBuilder substreamInsight = new InsightBuilder();
765       currentState.winningSubstream.stream.appendTimeoutInsight(substreamInsight);
766       insight.appendKeyValue("committed", substreamInsight);
767     } else {
768       InsightBuilder openSubstreamsInsight = new InsightBuilder();
769       // drainedSubstreams doesn't include all open substreams.  Those which have just been created
770       // and are still catching up with buffered requests (in other words, still draining) will not
771       // show up.  We think this is benign, because the draining should be typically fast, and it'd
772       // be indistinguishable from the case where those streams are to be created a little late due
773       // to delays in the timer.
774       for (Substream sub : currentState.drainedSubstreams) {
775         InsightBuilder substreamInsight = new InsightBuilder();
776         sub.stream.appendTimeoutInsight(substreamInsight);
777         openSubstreamsInsight.append(substreamInsight);
778       }
779       insight.appendKeyValue("open", openSubstreamsInsight);
780     }
781   }
782 
783   private static Random random = new Random();
784 
785   @VisibleForTesting
setRandom(Random random)786   static void setRandom(Random random) {
787     RetriableStream.random = random;
788   }
789 
790   /**
791    * Whether there is any potential hedge at the moment. A false return value implies there is
792    * absolutely no potential hedge. At least one of the hedges will observe a false return value
793    * when calling this method, unless otherwise the rpc is committed.
794    */
795   // only called when isHedging is true
796   @GuardedBy("lock")
hasPotentialHedging(State state)797   private boolean hasPotentialHedging(State state) {
798     return state.winningSubstream == null
799         && state.hedgingAttemptCount < hedgingPolicy.maxAttempts
800         && !state.hedgingFrozen;
801   }
802 
803   @SuppressWarnings("GuardedBy")
freezeHedging()804   private void freezeHedging() {
805     Future<?> futureToBeCancelled = null;
806     synchronized (lock) {
807       if (scheduledHedging != null) {
808         // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
809         // found: 'this.lock'
810         futureToBeCancelled = scheduledHedging.markCancelled();
811         scheduledHedging = null;
812       }
813       state = state.freezeHedging();
814     }
815 
816     if (futureToBeCancelled != null) {
817       futureToBeCancelled.cancel(false);
818     }
819   }
820 
safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata)821   private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
822     savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
823         metadata);
824     if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
825       listenerSerializeExecutor.execute(
826           new Runnable() {
827             @Override
828             public void run() {
829               isClosed = true;
830               masterListener.closed(status, progress, metadata);
831             }
832           });
833     }
834   }
835 
836   private static final class SavedCloseMasterListenerReason {
837     private final Status status;
838     private final RpcProgress progress;
839     private final Metadata metadata;
840 
SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata)841     SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
842       this.status = status;
843       this.progress = progress;
844       this.metadata = metadata;
845     }
846   }
847 
848   private interface BufferEntry {
849     /** Replays the buffer entry with the given stream. */
runWith(Substream substream)850     void runWith(Substream substream);
851   }
852 
853   private final class Sublistener implements ClientStreamListener {
854     final Substream substream;
855 
Sublistener(Substream substream)856     Sublistener(Substream substream) {
857       this.substream = substream;
858     }
859 
860     @Override
headersRead(final Metadata headers)861     public void headersRead(final Metadata headers) {
862       if (substream.previousAttemptCount > 0) {
863         headers.discardAll(GRPC_PREVIOUS_RPC_ATTEMPTS);
864         headers.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(substream.previousAttemptCount));
865       }
866       commitAndRun(substream);
867       if (state.winningSubstream == substream) {
868         if (throttle != null) {
869           throttle.onSuccess();
870         }
871         listenerSerializeExecutor.execute(
872             new Runnable() {
873               @Override
874               public void run() {
875                 masterListener.headersRead(headers);
876               }
877             });
878       }
879     }
880 
881     @Override
closed( final Status status, final RpcProgress rpcProgress, final Metadata trailers)882     public void closed(
883         final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
884       synchronized (lock) {
885         state = state.substreamClosed(substream);
886         closedSubstreamsInsight.append(status.getCode());
887       }
888 
889       if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
890         assert savedCloseMasterListenerReason != null;
891         listenerSerializeExecutor.execute(
892             new Runnable() {
893               @Override
894               public void run() {
895                 isClosed = true;
896                 masterListener.closed(savedCloseMasterListenerReason.status,
897                     savedCloseMasterListenerReason.progress,
898                     savedCloseMasterListenerReason.metadata);
899               }
900             });
901         return;
902       }
903 
904       // handle a race between buffer limit exceeded and closed, when setting
905       // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
906       if (substream.bufferLimitExceeded) {
907         commitAndRun(substream);
908         if (state.winningSubstream == substream) {
909           safeCloseMasterListener(status, rpcProgress, trailers);
910         }
911         return;
912       }
913       if (rpcProgress == RpcProgress.MISCARRIED
914           && localOnlyTransparentRetries.incrementAndGet() > 1_000) {
915         commitAndRun(substream);
916         if (state.winningSubstream == substream) {
917           Status tooManyTransparentRetries = Status.INTERNAL
918               .withDescription("Too many transparent retries. Might be a bug in gRPC")
919               .withCause(status.asRuntimeException());
920           safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
921         }
922         return;
923       }
924 
925       if (state.winningSubstream == null) {
926         if (rpcProgress == RpcProgress.MISCARRIED
927             || (rpcProgress == RpcProgress.REFUSED
928                 && noMoreTransparentRetry.compareAndSet(false, true))) {
929           // transparent retry
930           final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
931           if (newSubstream == null) {
932             return;
933           }
934           if (isHedging) {
935             synchronized (lock) {
936               // Although this operation is not done atomically with
937               // noMoreTransparentRetry.compareAndSet(false, true), it does not change the size() of
938               // activeHedges, so neither does it affect the commitment decision of other threads,
939               // nor do the commitment decision making threads affect itself.
940               state = state.replaceActiveHedge(substream, newSubstream);
941             }
942           }
943 
944           callExecutor.execute(new Runnable() {
945             @Override
946             public void run() {
947               drain(newSubstream);
948             }
949           });
950           return;
951         } else if (rpcProgress == RpcProgress.DROPPED) {
952           // For normal retry, nothing need be done here, will just commit.
953           // For hedging, cancel scheduled hedge that is scheduled prior to the drop
954           if (isHedging) {
955             freezeHedging();
956           }
957         } else {
958           noMoreTransparentRetry.set(true);
959 
960           if (isHedging) {
961             HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
962             if (hedgingPlan.isHedgeable) {
963               pushbackHedging(hedgingPlan.hedgingPushbackMillis);
964             }
965             synchronized (lock) {
966               state = state.removeActiveHedge(substream);
967               // The invariant is whether or not #(Potential Hedge + active hedges) > 0.
968               // Once hasPotentialHedging(state) is false, it will always be false, and then
969               // #(state.activeHedges) will be decreasing. This guarantees that even there may be
970               // multiple concurrent hedges, one of the hedges will end up committed.
971               if (hedgingPlan.isHedgeable) {
972                 if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
973                   return;
974                 }
975                 // else, no activeHedges, no new hedges possible, try to commit
976               } // else, isHedgeable is false, try to commit
977             }
978           } else {
979             RetryPlan retryPlan = makeRetryDecision(status, trailers);
980             if (retryPlan.shouldRetry) {
981               // retry
982               Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false);
983               if (newSubstream == null) {
984                 return;
985               }
986               // The check state.winningSubstream == null, checking if is not already committed, is
987               // racy, but is still safe b/c the retry will also handle committed/cancellation
988               FutureCanceller scheduledRetryCopy;
989               synchronized (lock) {
990                 scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
991               }
992               class RetryBackoffRunnable implements Runnable {
993                 @Override
994                 public void run() {
995                   callExecutor.execute(
996                       new Runnable() {
997                         @Override
998                         public void run() {
999                           drain(newSubstream);
1000                         }
1001                       });
1002                 }
1003               }
1004 
1005               scheduledRetryCopy.setFuture(
1006                   scheduledExecutorService.schedule(
1007                       new RetryBackoffRunnable(),
1008                       retryPlan.backoffNanos,
1009                       TimeUnit.NANOSECONDS));
1010               return;
1011             }
1012           }
1013         }
1014       }
1015 
1016       commitAndRun(substream);
1017       if (state.winningSubstream == substream) {
1018         safeCloseMasterListener(status, rpcProgress, trailers);
1019       }
1020     }
1021 
1022     /**
1023      * Decides in current situation whether or not the RPC should retry and if it should retry how
1024      * long the backoff should be. The decision does not take the commitment status into account, so
1025      * caller should check it separately. It also updates the throttle. It does not change state.
1026      */
makeRetryDecision(Status status, Metadata trailer)1027     private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
1028       if (retryPolicy == null) {
1029         return new RetryPlan(false, 0);
1030       }
1031       boolean shouldRetry = false;
1032       long backoffNanos = 0L;
1033       boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
1034       Integer pushbackMillis = getPushbackMills(trailer);
1035       boolean isThrottled = false;
1036       if (throttle != null) {
1037         if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
1038           isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1039         }
1040       }
1041 
1042       if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
1043         if (pushbackMillis == null) {
1044           if (isRetryableStatusCode) {
1045             shouldRetry = true;
1046             backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble());
1047             nextBackoffIntervalNanos = Math.min(
1048                 (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
1049                 retryPolicy.maxBackoffNanos);
1050           } // else no retry
1051         } else if (pushbackMillis >= 0) {
1052           shouldRetry = true;
1053           backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
1054           nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
1055         } // else no retry
1056       } // else no retry
1057 
1058       return new RetryPlan(shouldRetry, backoffNanos);
1059     }
1060 
makeHedgingDecision(Status status, Metadata trailer)1061     private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
1062       Integer pushbackMillis = getPushbackMills(trailer);
1063       boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
1064       boolean isThrottled = false;
1065       if (throttle != null) {
1066         if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
1067           isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
1068         }
1069       }
1070       return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
1071     }
1072 
1073     @Nullable
getPushbackMills(Metadata trailer)1074     private Integer getPushbackMills(Metadata trailer) {
1075       String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
1076       Integer pushbackMillis = null;
1077       if (pushbackStr != null) {
1078         try {
1079           pushbackMillis = Integer.valueOf(pushbackStr);
1080         } catch (NumberFormatException e) {
1081           pushbackMillis = -1;
1082         }
1083       }
1084       return pushbackMillis;
1085     }
1086 
1087     @Override
messagesAvailable(final MessageProducer producer)1088     public void messagesAvailable(final MessageProducer producer) {
1089       State savedState = state;
1090       checkState(
1091           savedState.winningSubstream != null, "Headers should be received prior to messages.");
1092       if (savedState.winningSubstream != substream) {
1093         GrpcUtil.closeQuietly(producer);
1094         return;
1095       }
1096       listenerSerializeExecutor.execute(
1097           new Runnable() {
1098             @Override
1099             public void run() {
1100               masterListener.messagesAvailable(producer);
1101             }
1102           });
1103     }
1104 
1105     @Override
onReady()1106     public void onReady() {
1107       // FIXME(#7089): hedging case is broken.
1108       if (!isReady()) {
1109         return;
1110       }
1111       listenerSerializeExecutor.execute(
1112           new Runnable() {
1113             @Override
1114             public void run() {
1115               if (!isClosed) {
1116                 masterListener.onReady();
1117               }
1118             }
1119           });
1120     }
1121   }
1122 
1123   private static final class State {
1124     /** Committed and the winning substream drained. */
1125     final boolean passThrough;
1126 
1127     /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
1128     @Nullable final List<BufferEntry> buffer;
1129 
1130     /**
1131      * Unmodifiable collection of all the open substreams that are drained. Singleton once
1132      * passThrough; Empty if committed but not passTrough.
1133      */
1134     final Collection<Substream> drainedSubstreams;
1135 
1136     /**
1137      * Unmodifiable collection of all the active hedging substreams.
1138      *
1139      * <p>A substream even with the attribute substream.closed being true may be considered still
1140      * "active" at the moment as long as it is in this collection.
1141      */
1142     final Collection<Substream> activeHedges; // not null once isHedging = true
1143 
1144     final int hedgingAttemptCount;
1145 
1146     /** Null until committed. */
1147     @Nullable final Substream winningSubstream;
1148 
1149     /** Not required to set to true when cancelled, but can short-circuit the draining process. */
1150     final boolean cancelled;
1151 
1152     /** No more hedging due to events like drop or pushback. */
1153     final boolean hedgingFrozen;
1154 
State( @ullable List<BufferEntry> buffer, Collection<Substream> drainedSubstreams, Collection<Substream> activeHedges, @Nullable Substream winningSubstream, boolean cancelled, boolean passThrough, boolean hedgingFrozen, int hedgingAttemptCount)1155     State(
1156         @Nullable List<BufferEntry> buffer,
1157         Collection<Substream> drainedSubstreams,
1158         Collection<Substream> activeHedges,
1159         @Nullable Substream winningSubstream,
1160         boolean cancelled,
1161         boolean passThrough,
1162         boolean hedgingFrozen,
1163         int hedgingAttemptCount) {
1164       this.buffer = buffer;
1165       this.drainedSubstreams =
1166           checkNotNull(drainedSubstreams, "drainedSubstreams");
1167       this.winningSubstream = winningSubstream;
1168       this.activeHedges = activeHedges;
1169       this.cancelled = cancelled;
1170       this.passThrough = passThrough;
1171       this.hedgingFrozen = hedgingFrozen;
1172       this.hedgingAttemptCount = hedgingAttemptCount;
1173 
1174       checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
1175       checkState(
1176           !passThrough || winningSubstream != null,
1177           "passThrough should imply winningSubstream != null");
1178       checkState(
1179           !passThrough
1180               || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
1181               || (drainedSubstreams.size() == 0 && winningSubstream.closed),
1182           "passThrough should imply winningSubstream is drained");
1183       checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
1184     }
1185 
1186     @CheckReturnValue
1187     // GuardedBy RetriableStream.lock
cancelled()1188     State cancelled() {
1189       return new State(
1190           buffer, drainedSubstreams, activeHedges, winningSubstream, true, passThrough,
1191           hedgingFrozen, hedgingAttemptCount);
1192     }
1193 
1194     /** The given substream is drained. */
1195     @CheckReturnValue
1196     // GuardedBy RetriableStream.lock
substreamDrained(Substream substream)1197     State substreamDrained(Substream substream) {
1198       checkState(!passThrough, "Already passThrough");
1199 
1200       Collection<Substream> drainedSubstreams;
1201 
1202       if (substream.closed) {
1203         drainedSubstreams = this.drainedSubstreams;
1204       } else if (this.drainedSubstreams.isEmpty()) {
1205         // optimize for 0-retry, which is most of the cases.
1206         drainedSubstreams = Collections.singletonList(substream);
1207       } else {
1208         drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1209         drainedSubstreams.add(substream);
1210         drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1211       }
1212 
1213       boolean passThrough = winningSubstream != null;
1214 
1215       List<BufferEntry> buffer = this.buffer;
1216       if (passThrough) {
1217         checkState(
1218             winningSubstream == substream, "Another RPC attempt has already committed");
1219         buffer = null;
1220       }
1221 
1222       return new State(
1223           buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1224           hedgingFrozen, hedgingAttemptCount);
1225     }
1226 
1227     /** The given substream is closed. */
1228     @CheckReturnValue
1229     // GuardedBy RetriableStream.lock
substreamClosed(Substream substream)1230     State substreamClosed(Substream substream) {
1231       substream.closed = true;
1232       if (this.drainedSubstreams.contains(substream)) {
1233         Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
1234         drainedSubstreams.remove(substream);
1235         drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
1236         return new State(
1237             buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1238             hedgingFrozen, hedgingAttemptCount);
1239       } else {
1240         return this;
1241       }
1242     }
1243 
1244     @CheckReturnValue
1245     // GuardedBy RetriableStream.lock
committed(Substream winningSubstream)1246     State committed(Substream winningSubstream) {
1247       checkState(this.winningSubstream == null, "Already committed");
1248 
1249       boolean passThrough = false;
1250       List<BufferEntry> buffer = this.buffer;
1251       Collection<Substream> drainedSubstreams;
1252 
1253       if (this.drainedSubstreams.contains(winningSubstream)) {
1254         passThrough = true;
1255         buffer = null;
1256         drainedSubstreams = Collections.singleton(winningSubstream);
1257       } else {
1258         drainedSubstreams = Collections.emptyList();
1259       }
1260 
1261       return new State(
1262           buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1263           hedgingFrozen, hedgingAttemptCount);
1264     }
1265 
1266     @CheckReturnValue
1267     // GuardedBy RetriableStream.lock
freezeHedging()1268     State freezeHedging() {
1269       if (hedgingFrozen) {
1270         return this;
1271       }
1272       return new State(
1273           buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1274           true, hedgingAttemptCount);
1275     }
1276 
1277     @CheckReturnValue
1278     // GuardedBy RetriableStream.lock
1279     // state.hedgingAttemptCount is modified only here.
1280     // The method is only called in RetriableStream.start() and HedgingRunnable.run()
addActiveHedge(Substream substream)1281     State addActiveHedge(Substream substream) {
1282       // hasPotentialHedging must be true
1283       checkState(!hedgingFrozen, "hedging frozen");
1284       checkState(winningSubstream == null, "already committed");
1285 
1286       Collection<Substream> activeHedges;
1287       if (this.activeHedges == null) {
1288         activeHedges = Collections.singleton(substream);
1289       } else {
1290         activeHedges = new ArrayList<>(this.activeHedges);
1291         activeHedges.add(substream);
1292         activeHedges = Collections.unmodifiableCollection(activeHedges);
1293       }
1294 
1295       int hedgingAttemptCount = this.hedgingAttemptCount + 1;
1296       return new State(
1297           buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1298           hedgingFrozen, hedgingAttemptCount);
1299     }
1300 
1301     @CheckReturnValue
1302     // GuardedBy RetriableStream.lock
1303     // The method is only called in Sublistener.closed()
removeActiveHedge(Substream substream)1304     State removeActiveHedge(Substream substream) {
1305       Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1306       activeHedges.remove(substream);
1307       activeHedges = Collections.unmodifiableCollection(activeHedges);
1308 
1309       return new State(
1310           buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1311           hedgingFrozen, hedgingAttemptCount);
1312     }
1313 
1314     @CheckReturnValue
1315     // GuardedBy RetriableStream.lock
1316     // The method is only called for transparent retry.
replaceActiveHedge(Substream oldOne, Substream newOne)1317     State replaceActiveHedge(Substream oldOne, Substream newOne) {
1318       Collection<Substream> activeHedges = new ArrayList<>(this.activeHedges);
1319       activeHedges.remove(oldOne);
1320       activeHedges.add(newOne);
1321       activeHedges = Collections.unmodifiableCollection(activeHedges);
1322 
1323       return new State(
1324           buffer, drainedSubstreams, activeHedges, winningSubstream, cancelled, passThrough,
1325           hedgingFrozen, hedgingAttemptCount);
1326     }
1327   }
1328 
1329   /**
1330    * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
1331    *  attributes.
1332    */
1333   private static final class Substream {
1334     ClientStream stream;
1335 
1336     // GuardedBy RetriableStream.lock
1337     boolean closed;
1338 
1339     // setting to true must be GuardedBy RetriableStream.lock
1340     boolean bufferLimitExceeded;
1341 
1342     final int previousAttemptCount;
1343 
Substream(int previousAttemptCount)1344     Substream(int previousAttemptCount) {
1345       this.previousAttemptCount = previousAttemptCount;
1346     }
1347   }
1348 
1349 
1350   /**
1351    * Traces the buffer used by a substream.
1352    */
1353   class BufferSizeTracer extends ClientStreamTracer {
1354     // Each buffer size tracer is dedicated to one specific substream.
1355     private final Substream substream;
1356 
1357     @GuardedBy("lock")
1358     long bufferNeeded;
1359 
BufferSizeTracer(Substream substream)1360     BufferSizeTracer(Substream substream) {
1361       this.substream = substream;
1362     }
1363 
1364     /**
1365      * A message is sent to the wire, so its reference would be released if no retry or
1366      * hedging were involved. So at this point we have to hold the reference of the message longer
1367      * for retry, and we need to increment {@code substream.bufferNeeded}.
1368      */
1369     @Override
outboundWireSize(long bytes)1370     public void outboundWireSize(long bytes) {
1371       if (state.winningSubstream != null) {
1372         return;
1373       }
1374 
1375       Runnable postCommitTask = null;
1376 
1377       // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
1378       synchronized (lock) {
1379         if (state.winningSubstream != null || substream.closed) {
1380           return;
1381         }
1382         bufferNeeded += bytes;
1383         if (bufferNeeded <= perRpcBufferUsed) {
1384           return;
1385         }
1386 
1387         if (bufferNeeded > perRpcBufferLimit) {
1388           substream.bufferLimitExceeded = true;
1389         } else {
1390           // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
1391           long savedChannelBufferUsed =
1392               channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
1393           perRpcBufferUsed = bufferNeeded;
1394 
1395           if (savedChannelBufferUsed > channelBufferLimit) {
1396             substream.bufferLimitExceeded = true;
1397           }
1398         }
1399 
1400         if (substream.bufferLimitExceeded) {
1401           postCommitTask = commit(substream);
1402         }
1403       }
1404 
1405       if (postCommitTask != null) {
1406         postCommitTask.run();
1407       }
1408     }
1409   }
1410 
1411   /**
1412    *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
1413    *  the Channel. There should be a single instance of it for each channel.
1414    */
1415   static final class ChannelBufferMeter {
1416     private final AtomicLong bufferUsed = new AtomicLong();
1417 
1418     @VisibleForTesting
addAndGet(long newBytesUsed)1419     long addAndGet(long newBytesUsed) {
1420       return bufferUsed.addAndGet(newBytesUsed);
1421     }
1422   }
1423 
1424   /**
1425    * Used for retry throttling.
1426    */
1427   static final class Throttle {
1428 
1429     private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
1430 
1431     /**
1432      * 1000 times the maxTokens field of the retryThrottling policy in service config.
1433      * The number of tokens starts at maxTokens. The token_count will always be between 0 and
1434      * maxTokens.
1435      */
1436     final int maxTokens;
1437 
1438     /**
1439      * Half of {@code maxTokens}.
1440      */
1441     final int threshold;
1442 
1443     /**
1444      * 1000 times the tokenRatio field of the retryThrottling policy in service config.
1445      */
1446     final int tokenRatio;
1447 
1448     final AtomicInteger tokenCount = new AtomicInteger();
1449 
Throttle(float maxTokens, float tokenRatio)1450     Throttle(float maxTokens, float tokenRatio) {
1451       // tokenRatio is up to 3 decimal places
1452       this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
1453       this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
1454       this.threshold = this.maxTokens / 2;
1455       tokenCount.set(this.maxTokens);
1456     }
1457 
1458     @VisibleForTesting
isAboveThreshold()1459     boolean isAboveThreshold() {
1460       return tokenCount.get() > threshold;
1461     }
1462 
1463     /**
1464      * Counts down the token on qualified failure and checks if it is above the threshold
1465      * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
1466      * a not-to-retry pushback.
1467      */
1468     @VisibleForTesting
onQualifiedFailureThenCheckIsAboveThreshold()1469     boolean onQualifiedFailureThenCheckIsAboveThreshold() {
1470       while (true) {
1471         int currentCount = tokenCount.get();
1472         if (currentCount == 0) {
1473           return false;
1474         }
1475         int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
1476         boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
1477         if (updated) {
1478           return decremented > threshold;
1479         }
1480       }
1481     }
1482 
1483     @VisibleForTesting
onSuccess()1484     void onSuccess() {
1485       while (true) {
1486         int currentCount = tokenCount.get();
1487         if (currentCount == maxTokens) {
1488           break;
1489         }
1490         int incremented = currentCount + tokenRatio;
1491         boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
1492         if (updated) {
1493           break;
1494         }
1495       }
1496     }
1497 
1498     @Override
equals(Object o)1499     public boolean equals(Object o) {
1500       if (this == o) {
1501         return true;
1502       }
1503       if (!(o instanceof Throttle)) {
1504         return false;
1505       }
1506       Throttle that = (Throttle) o;
1507       return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
1508     }
1509 
1510     @Override
hashCode()1511     public int hashCode() {
1512       return Objects.hashCode(maxTokens, tokenRatio);
1513     }
1514   }
1515 
1516   private static final class RetryPlan {
1517     final boolean shouldRetry;
1518     final long backoffNanos;
1519 
RetryPlan(boolean shouldRetry, long backoffNanos)1520     RetryPlan(boolean shouldRetry, long backoffNanos) {
1521       this.shouldRetry = shouldRetry;
1522       this.backoffNanos = backoffNanos;
1523     }
1524   }
1525 
1526   private static final class HedgingPlan {
1527     final boolean isHedgeable;
1528     @Nullable
1529     final Integer hedgingPushbackMillis;
1530 
HedgingPlan( boolean isHedgeable, @Nullable Integer hedgingPushbackMillis)1531     public HedgingPlan(
1532         boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
1533       this.isHedgeable = isHedgeable;
1534       this.hedgingPushbackMillis = hedgingPushbackMillis;
1535     }
1536   }
1537 
1538   /** Allows cancelling a Future without racing with setting the future. */
1539   private static final class FutureCanceller {
1540 
1541     final Object lock;
1542     @GuardedBy("lock")
1543     Future<?> future;
1544     @GuardedBy("lock")
1545     boolean cancelled;
1546 
FutureCanceller(Object lock)1547     FutureCanceller(Object lock) {
1548       this.lock = lock;
1549     }
1550 
setFuture(Future<?> future)1551     void setFuture(Future<?> future) {
1552       synchronized (lock) {
1553         if (!cancelled) {
1554           this.future = future;
1555         }
1556       }
1557     }
1558 
1559     @GuardedBy("lock")
1560     @CheckForNull // Must cancel the returned future if not null.
markCancelled()1561     Future<?> markCancelled() {
1562       cancelled = true;
1563       return future;
1564     }
1565 
1566     @GuardedBy("lock")
isCancelled()1567     boolean isCancelled() {
1568       return cancelled;
1569     }
1570   }
1571 }
1572