• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static java.util.concurrent.TimeUnit.NANOSECONDS;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.MoreObjects;
25 import io.grpc.Attributes;
26 import io.grpc.ClientCall;
27 import io.grpc.Context;
28 import io.grpc.Deadline;
29 import io.grpc.Metadata;
30 import io.grpc.Status;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Locale;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.TimeUnit;
38 import java.util.logging.Level;
39 import java.util.logging.Logger;
40 import javax.annotation.Nullable;
41 import javax.annotation.concurrent.GuardedBy;
42 
43 /**
44  * A call that queues requests before a real call is ready to be delegated to.
45  *
46  * <p>{@code ClientCall} itself doesn't require thread-safety. However, the state of {@code
47  * DelayedCall} may be internally altered by different threads, thus internal synchronization is
48  * necessary.
49  */
50 public class DelayedClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
51   private static final Logger logger = Logger.getLogger(DelayedClientCall.class.getName());
52   /**
53    * A timer to monitor the initial deadline. The timer must be cancelled on transition to the real
54    * call.
55    */
56   @Nullable
57   private final ScheduledFuture<?> initialDeadlineMonitor;
58   private final Executor callExecutor;
59   private final Context context;
60   /** {@code true} once realCall is valid and all pending calls have been drained. */
61   private volatile boolean passThrough;
62   /**
63    * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate
64    * order, but also used if an error occurs before {@code realCall} is set.
65    */
66   private Listener<RespT> listener;
67   // Must hold {@code this} lock when setting.
68   private ClientCall<ReqT, RespT> realCall;
69   @GuardedBy("this")
70   private Status error;
71   @GuardedBy("this")
72   private List<Runnable> pendingRunnables = new ArrayList<>();
73   @GuardedBy("this")
74   private DelayedListener<RespT> delayedListener;
75 
DelayedClientCall( Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline)76   protected DelayedClientCall(
77       Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
78     this.callExecutor = checkNotNull(callExecutor, "callExecutor");
79     checkNotNull(scheduler, "scheduler");
80     context = Context.current();
81     initialDeadlineMonitor = scheduleDeadlineIfNeeded(scheduler, deadline);
82   }
83 
84   // If one argument is null, consider the other the "Before"
isAbeforeB(@ullable Deadline a, @Nullable Deadline b)85   private boolean isAbeforeB(@Nullable Deadline a, @Nullable Deadline b) {
86     if (b == null) {
87       return true;
88     } else if (a == null) {
89       return false;
90     }
91 
92     return a.isBefore(b);
93   }
94 
95   @Nullable
scheduleDeadlineIfNeeded( ScheduledExecutorService scheduler, @Nullable Deadline deadline)96   private ScheduledFuture<?> scheduleDeadlineIfNeeded(
97       ScheduledExecutorService scheduler, @Nullable Deadline deadline) {
98     Deadline contextDeadline = context.getDeadline();
99     if (deadline == null && contextDeadline == null) {
100       return null;
101     }
102     long remainingNanos = Long.MAX_VALUE;
103     if (deadline != null) {
104       remainingNanos = deadline.timeRemaining(NANOSECONDS);
105     }
106 
107     if (contextDeadline != null && contextDeadline.timeRemaining(NANOSECONDS) < remainingNanos) {
108       remainingNanos = contextDeadline.timeRemaining(NANOSECONDS);
109       if (logger.isLoggable(Level.FINE)) {
110         StringBuilder builder =
111             new StringBuilder(
112                 String.format(
113                     Locale.US,
114                     "Call timeout set to '%d' ns, due to context deadline.", remainingNanos));
115         if (deadline == null) {
116           builder.append(" Explicit call timeout was not set.");
117         } else {
118           long callTimeout = deadline.timeRemaining(TimeUnit.NANOSECONDS);
119           builder.append(String.format(
120               Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
121         }
122         logger.fine(builder.toString());
123       }
124     }
125 
126     long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
127     long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
128     final StringBuilder buf = new StringBuilder();
129     String deadlineName = isAbeforeB(contextDeadline, deadline) ? "Context" : "CallOptions";
130     if (remainingNanos < 0) {
131       buf.append("ClientCall started after ");
132       buf.append(deadlineName);
133       buf.append(" deadline was exceeded. Deadline has been exceeded for ");
134     } else {
135       buf.append("Deadline ");
136       buf.append(deadlineName);
137       buf.append(" will be exceeded in ");
138     }
139     buf.append(seconds);
140     buf.append(String.format(Locale.US, ".%09d", nanos));
141     buf.append("s. ");
142 
143     /** Cancels the call if deadline exceeded prior to the real call being set. */
144     class DeadlineExceededRunnable implements Runnable {
145       @Override
146       public void run() {
147         cancel(
148             Status.DEADLINE_EXCEEDED.withDescription(buf.toString()),
149             // We should not cancel the call if the realCall is set because there could be a
150             // race between cancel() and realCall.start(). The realCall will handle deadline by
151             // itself.
152             /* onlyCancelPendingCall= */ true);
153       }
154     }
155 
156     return scheduler.schedule(new DeadlineExceededRunnable(), remainingNanos, NANOSECONDS);
157   }
158 
159   /**
160    * Transfers all pending and future requests and mutations to the given call.
161    *
162    * <p>No-op if either this method or {@link #cancel} have already been called.
163    */
164   // When this method returns, passThrough is guaranteed to be true
setCall(ClientCall<ReqT, RespT> call)165   public final Runnable setCall(ClientCall<ReqT, RespT> call) {
166     synchronized (this) {
167       // If realCall != null, then either setCall() or cancel() has been called.
168       if (realCall != null) {
169         return null;
170       }
171       setRealCall(checkNotNull(call, "call"));
172     }
173     return new ContextRunnable(context) {
174       @Override
175       public void runInContext() {
176         drainPendingCalls();
177       }
178     };
179   }
180 
181   @Override
182   public final void start(Listener<RespT> listener, final Metadata headers) {
183     checkState(this.listener == null, "already started");
184     Status savedError;
185     boolean savedPassThrough;
186     synchronized (this) {
187       this.listener = checkNotNull(listener, "listener");
188       // If error != null, then cancel() has been called and was unable to close the listener
189       savedError = error;
190       savedPassThrough = passThrough;
191       if (!savedPassThrough) {
192         listener = delayedListener = new DelayedListener<>(listener);
193       }
194     }
195     if (savedError != null) {
196       callExecutor.execute(new CloseListenerRunnable(listener, savedError));
197       return;
198     }
199     if (savedPassThrough) {
200       realCall.start(listener, headers);
201     } else {
202       final Listener<RespT> finalListener = listener;
203       delayOrExecute(new Runnable() {
204         @Override
205         public void run() {
206           realCall.start(finalListener, headers);
207         }
208       });
209     }
210   }
211 
212   // When this method returns, passThrough is guaranteed to be true
213   @Override
214   public final void cancel(@Nullable final String message, @Nullable final Throwable cause) {
215     Status status = Status.CANCELLED;
216     if (message != null) {
217       status = status.withDescription(message);
218     } else {
219       status = status.withDescription("Call cancelled without message");
220     }
221     if (cause != null) {
222       status = status.withCause(cause);
223     }
224     cancel(status, false);
225   }
226 
227   /**
228    * Cancels the call unless {@code realCall} is set and {@code onlyCancelPendingCall} is true.
229    */
230   private void cancel(final Status status, boolean onlyCancelPendingCall) {
231     boolean delegateToRealCall = true;
232     Listener<RespT> listenerToClose = null;
233     synchronized (this) {
234       // If realCall != null, then either setCall() or cancel() has been called
235       if (realCall == null) {
236         @SuppressWarnings("unchecked")
237         ClientCall<ReqT, RespT> noopCall = (ClientCall<ReqT, RespT>) NOOP_CALL;
238         setRealCall(noopCall);
239         delegateToRealCall = false;
240         // If listener == null, then start() will later call listener with 'error'
241         listenerToClose = listener;
242         error = status;
243       } else if (onlyCancelPendingCall) {
244         return;
245       }
246     }
247     if (delegateToRealCall) {
248       delayOrExecute(new Runnable() {
249         @Override
250         public void run() {
251           realCall.cancel(status.getDescription(), status.getCause());
252         }
253       });
254     } else {
255       if (listenerToClose != null) {
256         callExecutor.execute(new CloseListenerRunnable(listenerToClose, status));
257       }
258       drainPendingCalls();
259     }
260     callCancelled();
261   }
262 
263   protected void callCancelled() {
264   }
265 
266   private void delayOrExecute(Runnable runnable) {
267     synchronized (this) {
268       if (!passThrough) {
269         pendingRunnables.add(runnable);
270         return;
271       }
272     }
273     runnable.run();
274   }
275 
276   /**
277    * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
278    * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
279    * should not be held when calling this method.
280    */
281   private void drainPendingCalls() {
282     assert realCall != null;
283     assert !passThrough;
284     List<Runnable> toRun = new ArrayList<>();
285     DelayedListener<RespT> delayedListener ;
286     while (true) {
287       synchronized (this) {
288         if (pendingRunnables.isEmpty()) {
289           pendingRunnables = null;
290           passThrough = true;
291           delayedListener = this.delayedListener;
292           break;
293         }
294         // Since there were pendingCalls, we need to process them. To maintain ordering we can't set
295         // passThrough=true until we run all pendingCalls, but new Runnables may be added after we
296         // drop the lock. So we will have to re-check pendingCalls.
297         List<Runnable> tmp = toRun;
298         toRun = pendingRunnables;
299         pendingRunnables = tmp;
300       }
301       for (Runnable runnable : toRun) {
302         // Must not call transport while lock is held to prevent deadlocks.
303         // TODO(ejona): exception handling
304         runnable.run();
305       }
306       toRun.clear();
307     }
308     if (delayedListener != null) {
309       final DelayedListener<RespT> listener = delayedListener;
310       class DrainListenerRunnable extends ContextRunnable {
311         DrainListenerRunnable() {
312           super(context);
313         }
314 
315         @Override
316         public void runInContext() {
317           listener.drainPendingCallbacks();
318         }
319       }
320 
321       callExecutor.execute(new DrainListenerRunnable());
322     }
323   }
324 
325   @GuardedBy("this")
326   private void setRealCall(ClientCall<ReqT, RespT> realCall) {
327     checkState(this.realCall == null, "realCall already set to %s", this.realCall);
328     if (initialDeadlineMonitor != null) {
329       initialDeadlineMonitor.cancel(false);
330     }
331     this.realCall = realCall;
332   }
333 
334   @VisibleForTesting
335   final ClientCall<ReqT, RespT> getRealCall() {
336     return realCall;
337   }
338 
339   @Override
340   public final void sendMessage(final ReqT message) {
341     if (passThrough) {
342       realCall.sendMessage(message);
343     } else {
344       delayOrExecute(new Runnable() {
345         @Override
346         public void run() {
347           realCall.sendMessage(message);
348         }
349       });
350     }
351   }
352 
353   @Override
354   public final void setMessageCompression(final boolean enable) {
355     if (passThrough) {
356       realCall.setMessageCompression(enable);
357     } else {
358       delayOrExecute(new Runnable() {
359         @Override
360         public void run() {
361           realCall.setMessageCompression(enable);
362         }
363       });
364     }
365   }
366 
367   @Override
368   public final void request(final int numMessages) {
369     if (passThrough) {
370       realCall.request(numMessages);
371     } else {
372       delayOrExecute(new Runnable() {
373         @Override
374         public void run() {
375           realCall.request(numMessages);
376         }
377       });
378     }
379   }
380 
381   @Override
382   public final void halfClose() {
383     delayOrExecute(new Runnable() {
384       @Override
385       public void run() {
386         realCall.halfClose();
387       }
388     });
389   }
390 
391   @Override
392   public final boolean isReady() {
393     if (passThrough) {
394       return realCall.isReady();
395     } else {
396       return false;
397     }
398   }
399 
400   @Override
401   public final Attributes getAttributes() {
402     ClientCall<ReqT, RespT> savedRealCall;
403     synchronized (this) {
404       savedRealCall = realCall;
405     }
406     if (savedRealCall != null) {
407       return savedRealCall.getAttributes();
408     } else {
409       return Attributes.EMPTY;
410     }
411   }
412 
413   @Override
414   public String toString() {
415     return MoreObjects.toStringHelper(this)
416         .add("realCall", realCall)
417         .toString();
418   }
419 
420   private final class CloseListenerRunnable extends ContextRunnable {
421     final Listener<RespT> listener;
422     final Status status;
423 
424     CloseListenerRunnable(Listener<RespT> listener, Status status) {
425       super(context);
426       this.listener = listener;
427       this.status = status;
428     }
429 
430     @Override
431     public void runInContext() {
432       listener.onClose(status, new Metadata());
433     }
434   }
435 
436   private static final class DelayedListener<RespT> extends Listener<RespT> {
437     private final Listener<RespT> realListener;
438     private volatile boolean passThrough;
439     @GuardedBy("this")
440     private List<Runnable> pendingCallbacks = new ArrayList<>();
441 
442     public DelayedListener(Listener<RespT> listener) {
443       this.realListener = listener;
444     }
445 
446     private void delayOrExecute(Runnable runnable) {
447       synchronized (this) {
448         if (!passThrough) {
449           pendingCallbacks.add(runnable);
450           return;
451         }
452       }
453       runnable.run();
454     }
455 
456     @Override
457     public void onHeaders(final Metadata headers) {
458       if (passThrough) {
459         realListener.onHeaders(headers);
460       } else {
461         delayOrExecute(new Runnable() {
462           @Override
463           public void run() {
464             realListener.onHeaders(headers);
465           }
466         });
467       }
468     }
469 
470     @Override
471     public void onMessage(final RespT message) {
472       if (passThrough) {
473         realListener.onMessage(message);
474       } else {
475         delayOrExecute(new Runnable() {
476           @Override
477           public void run() {
478             realListener.onMessage(message);
479           }
480         });
481       }
482     }
483 
484     @Override
485     public void onClose(final Status status, final Metadata trailers) {
486       delayOrExecute(new Runnable() {
487         @Override
488         public void run() {
489           realListener.onClose(status, trailers);
490         }
491       });
492     }
493 
494     @Override
495     public void onReady() {
496       if (passThrough) {
497         realListener.onReady();
498       } else {
499         delayOrExecute(new Runnable() {
500           @Override
501           public void run() {
502             realListener.onReady();
503           }
504         });
505       }
506     }
507 
508     void drainPendingCallbacks() {
509       assert !passThrough;
510       List<Runnable> toRun = new ArrayList<>();
511       while (true) {
512         synchronized (this) {
513           if (pendingCallbacks.isEmpty()) {
514             pendingCallbacks = null;
515             passThrough = true;
516             break;
517           }
518           // Since there were pendingCallbacks, we need to process them. To maintain ordering we
519           // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
520           // added after we drop the lock. So we will have to re-check pendingCallbacks.
521           List<Runnable> tmp = toRun;
522           toRun = pendingCallbacks;
523           pendingCallbacks = tmp;
524         }
525         for (Runnable runnable : toRun) {
526           // Avoid calling listener while lock is held to prevent deadlocks.
527           // TODO(ejona): exception handling
528           runnable.run();
529         }
530         toRun.clear();
531       }
532     }
533   }
534 
535   private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
536     @Override
537     public void start(Listener<Object> responseListener, Metadata headers) {}
538 
539     @Override
540     public void request(int numMessages) {}
541 
542     @Override
543     public void cancel(String message, Throwable cause) {}
544 
545     @Override
546     public void halfClose() {}
547 
548     @Override
549     public void sendMessage(Object message) {}
550 
551     // Always returns {@code false}, since this is only used when the startup of the call fails.
552     @Override
553     public boolean isReady() {
554       return false;
555     }
556   };
557 }
558