• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2018 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import java.lang.Thread.UncaughtExceptionHandler;
23 import java.util.Queue;
24 import java.util.concurrent.ConcurrentLinkedQueue;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import javax.annotation.concurrent.ThreadSafe;
31 
32 /**
33  * A synchronization context is a queue of tasks that run in sequence.  It offers following
34  * guarantees:
35  *
36  * <ul>
37  *    <li>Ordering.  Tasks are run in the same order as they are submitted via {@link #execute}
38  *        and {@link #executeLater}.</li>
39  *    <li>Serialization.  Tasks are run in sequence and establish a happens-before relationship
40  *        between them. </li>
41  *    <li>Non-reentrancy.  If a task running in a synchronization context executes or schedules
42  *        another task in the same synchronization context, the latter task will never run
43  *        inline.  It will instead be queued and run only after the current task has returned.</li>
44  * </ul>
45  *
46  * <p>It doesn't own any thread.  Tasks are run from caller's or caller-provided threads.
47  *
48  * <p>Conceptually, it is fairly accurate to think of {@code SynchronizationContext} like a cheaper
49  * {@code Executors.newSingleThreadExecutor()} when used for synchronization (not long-running
50  * tasks). Both use a queue for tasks that are run in order and neither guarantee that tasks have
51  * completed before returning from {@code execute()}. However, the behavior does diverge if locks
52  * are held when calling the context. So it is encouraged to avoid mixing locks and synchronization
53  * context except via {@link #executeLater}.
54  *
55  * <p>This class is thread-safe.
56  *
57  * @since 1.17.0
58  */
59 @ThreadSafe
60 public final class SynchronizationContext implements Executor {
61   private final UncaughtExceptionHandler uncaughtExceptionHandler;
62 
63   private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
64   private final AtomicReference<Thread> drainingThread = new AtomicReference<>();
65 
66   /**
67    * Creates a SynchronizationContext.
68    *
69    * @param uncaughtExceptionHandler handles exceptions thrown out of the tasks.  Different from
70    *        what's documented on {@link UncaughtExceptionHandler#uncaughtException}, the thread is
71    *        not terminated when the handler is called.
72    */
SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler)73   public SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler) {
74     this.uncaughtExceptionHandler =
75         checkNotNull(uncaughtExceptionHandler, "uncaughtExceptionHandler");
76   }
77 
78   /**
79    * Run all tasks in the queue in the current thread, if no other thread is running this method.
80    * Otherwise do nothing.
81    *
82    * <p>Upon returning, it guarantees that all tasks submitted by {@code #executeLater} before it
83    * have been or will eventually be run, while not requiring any more calls to {@code drain()}.
84    */
drain()85   public final void drain() {
86     do {
87       if (!drainingThread.compareAndSet(null, Thread.currentThread())) {
88         return;
89       }
90       try {
91         Runnable runnable;
92         while ((runnable = queue.poll()) != null) {
93           try {
94             runnable.run();
95           } catch (Throwable t) {
96             uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t);
97           }
98         }
99       } finally {
100         drainingThread.set(null);
101       }
102       // must check queue again here to catch any added prior to clearing drainingThread
103     } while (!queue.isEmpty());
104   }
105 
106   /**
107    * Adds a task that will be run when {@link #drain} is called.
108    *
109    * <p>This is useful for cases where you want to enqueue a task while under a lock of your own,
110    * but don't want the tasks to be run under your lock (for fear of deadlock).  You can call {@link
111    * #executeLater} in the lock, and call {@link #drain} outside the lock.
112    */
executeLater(Runnable runnable)113   public final void executeLater(Runnable runnable) {
114     queue.add(checkNotNull(runnable, "runnable is null"));
115   }
116 
117   /**
118    * Adds a task and run it in this synchronization context as soon as possible.  The task may run
119    * inline.  If there are tasks that are previously queued by {@link #executeLater} but have not
120    * been run, this method will trigger them to be run before the given task.  This is equivalent to
121    * calling {@link #executeLater} immediately followed by {@link #drain}.
122    */
123   @Override
execute(Runnable task)124   public final void execute(Runnable task) {
125     executeLater(task);
126     drain();
127   }
128 
129   /**
130    * Throw {@link IllegalStateException} if this method is not called from this synchronization
131    * context.
132    */
throwIfNotInThisSynchronizationContext()133   public void throwIfNotInThisSynchronizationContext() {
134     checkState(Thread.currentThread() == drainingThread.get(),
135         "Not called from the SynchronizationContext");
136   }
137 
138   /**
139    * Schedules a task to be added and run via {@link #execute} after a delay.
140    *
141    * @param task the task being scheduled
142    * @param delay the delay
143    * @param unit the time unit for the delay
144    * @param timerService the {@code ScheduledExecutorService} that provides delayed execution
145    *
146    * @return an object for checking the status and/or cancel the scheduled task
147    */
schedule( final Runnable task, long delay, TimeUnit unit, ScheduledExecutorService timerService)148   public final ScheduledHandle schedule(
149       final Runnable task, long delay, TimeUnit unit, ScheduledExecutorService timerService) {
150     final ManagedRunnable runnable = new ManagedRunnable(task);
151     ScheduledFuture<?> future = timerService.schedule(new Runnable() {
152         @Override
153         public void run() {
154           execute(runnable);
155         }
156 
157         @Override
158         public String toString() {
159           return task.toString() + "(scheduled in SynchronizationContext)";
160         }
161       }, delay, unit);
162     return new ScheduledHandle(runnable, future);
163   }
164 
165   /**
166    * Schedules a task to be added and run via {@link #execute} after an inital delay and then
167    * repeated after the delay until cancelled.
168    *
169    * @param task the task being scheduled
170    * @param initialDelay the delay before the first run
171    * @param delay the delay after the first run.
172    * @param unit the time unit for the delay
173    * @param timerService the {@code ScheduledExecutorService} that provides delayed execution
174    *
175    * @return an object for checking the status and/or cancel the scheduled task
176    */
scheduleWithFixedDelay( final Runnable task, long initialDelay, long delay, TimeUnit unit, ScheduledExecutorService timerService)177   public final ScheduledHandle scheduleWithFixedDelay(
178       final Runnable task, long initialDelay, long delay, TimeUnit unit,
179       ScheduledExecutorService timerService) {
180     final ManagedRunnable runnable = new ManagedRunnable(task);
181     ScheduledFuture<?> future = timerService.scheduleWithFixedDelay(new Runnable() {
182       @Override
183       public void run() {
184         execute(runnable);
185       }
186 
187       @Override
188       public String toString() {
189         return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay
190             + ")";
191       }
192     }, initialDelay, delay, unit);
193     return new ScheduledHandle(runnable, future);
194   }
195 
196 
197   private static class ManagedRunnable implements Runnable {
198     final Runnable task;
199     boolean isCancelled;
200     boolean hasStarted;
201 
ManagedRunnable(Runnable task)202     ManagedRunnable(Runnable task) {
203       this.task = checkNotNull(task, "task");
204     }
205 
206     @Override
run()207     public void run() {
208       // The task may have been cancelled after timerService calls SynchronizationContext.execute()
209       // but before the runnable is actually run.  We must guarantee that the task will not be run
210       // in this case.
211       if (!isCancelled) {
212         hasStarted = true;
213         task.run();
214       }
215     }
216   }
217 
218   /**
219    * Allows the user to check the status and/or cancel a task scheduled by {@link #schedule}.
220    *
221    * <p>This class is NOT thread-safe.  All methods must be run from the same {@link
222    * SynchronizationContext} as which the task was scheduled in.
223    */
224   public static final class ScheduledHandle {
225     private final ManagedRunnable runnable;
226     private final ScheduledFuture<?> future;
227 
ScheduledHandle(ManagedRunnable runnable, ScheduledFuture<?> future)228     private ScheduledHandle(ManagedRunnable runnable, ScheduledFuture<?> future) {
229       this.runnable = checkNotNull(runnable, "runnable");
230       this.future = checkNotNull(future, "future");
231     }
232 
233     /**
234      * Cancel the task if it's still {@link #isPending pending}.
235      */
cancel()236     public void cancel() {
237       runnable.isCancelled = true;
238       future.cancel(false);
239     }
240 
241     /**
242      * Returns {@code true} if the task will eventually run, meaning that it has neither started
243      * running nor been cancelled.
244      */
isPending()245     public boolean isPending() {
246       return !(runnable.hasStarted || runnable.isCancelled);
247     }
248   }
249 }
250