• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import java.util.ArrayDeque;
23 import java.util.Queue;
24 import java.util.logging.Level;
25 import java.util.logging.Logger;
26 import javax.annotation.concurrent.GuardedBy;
27 import javax.annotation.concurrent.ThreadSafe;
28 
29 /**
30  * The thread-less Channel Executor used to run the state mutation logic in {@link
31  * ManagedChannelImpl}, {@link InternalSubchannel} and {@link io.grpc.LoadBalancer}s.
32  *
33  * <p>Tasks are queued until {@link #drain} is called.  Tasks are guaranteed to be run in the same
34  * order as they are submitted.
35  */
36 @ThreadSafe
37 class ChannelExecutor {
38   private static final Logger log = Logger.getLogger(ChannelExecutor.class.getName());
39 
40   private final Object lock = new Object();
41 
42   @GuardedBy("lock")
43   private final Queue<Runnable> queue = new ArrayDeque<Runnable>();
44   @GuardedBy("lock")
45   private boolean draining;
46 
47   /**
48    * Run all tasks in the queue in the current thread, if no other thread is in this method.
49    * Otherwise do nothing.
50    *
51    * <p>Upon returning, it guarantees that all tasks submitted by {@code executeLater()} before it
52    * have been or will eventually be run, while not requiring any more calls to {@code drain()}.
53    */
drain()54   final void drain() {
55     boolean drainLeaseAcquired = false;
56     while (true) {
57       Runnable runnable;
58       synchronized (lock) {
59         if (!drainLeaseAcquired) {
60           if (draining) {
61             return;
62           }
63           draining = true;
64           drainLeaseAcquired = true;
65         }
66         runnable = queue.poll();
67         if (runnable == null) {
68           draining = false;
69           break;
70         }
71       }
72       try {
73         runnable.run();
74       } catch (Throwable t) {
75         handleUncaughtThrowable(t);
76       }
77     }
78   }
79 
80   /**
81    * Enqueues a task that will be run when {@link #drain} is called.
82    *
83    * @return this ChannelExecutor
84    */
executeLater(Runnable runnable)85   final ChannelExecutor executeLater(Runnable runnable) {
86     synchronized (lock) {
87       queue.add(checkNotNull(runnable, "runnable is null"));
88     }
89     return this;
90   }
91 
92   @VisibleForTesting
numPendingTasks()93   final int numPendingTasks() {
94     synchronized (lock) {
95       return queue.size();
96     }
97   }
98 
99   /**
100    * Handle a throwable from a task.
101    *
102    * <p>The default implementation logs a warning.
103    */
handleUncaughtThrowable(Throwable t)104   void handleUncaughtThrowable(Throwable t) {
105     log.log(Level.WARNING, "Runnable threw exception in ChannelExecutor", t);
106   }
107 }
108