1 /* 2 * Copyright 2018 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import java.lang.Thread.UncaughtExceptionHandler; 23 import java.util.Queue; 24 import java.util.concurrent.ConcurrentLinkedQueue; 25 import java.util.concurrent.Executor; 26 import java.util.concurrent.ScheduledExecutorService; 27 import java.util.concurrent.ScheduledFuture; 28 import java.util.concurrent.TimeUnit; 29 import java.util.concurrent.atomic.AtomicReference; 30 import javax.annotation.concurrent.ThreadSafe; 31 32 /** 33 * A synchronization context is a queue of tasks that run in sequence. It offers following 34 * guarantees: 35 * 36 * <ul> 37 * <li>Ordering. Tasks are run in the same order as they are submitted via {@link #execute} 38 * and {@link #executeLater}.</li> 39 * <li>Serialization. Tasks are run in sequence and establish a happens-before relationship 40 * between them. </li> 41 * <li>Non-reentrancy. If a task running in a synchronization context executes or schedules 42 * another task in the same synchronization context, the latter task will never run 43 * inline. It will instead be queued and run only after the current task has returned.</li> 44 * </ul> 45 * 46 * <p>It doesn't own any thread. Tasks are run from caller's or caller-provided threads. 47 * 48 * <p>Conceptually, it is fairly accurate to think of {@code SynchronizationContext} like a cheaper 49 * {@code Executors.newSingleThreadExecutor()} when used for synchronization (not long-running 50 * tasks). Both use a queue for tasks that are run in order and neither guarantee that tasks have 51 * completed before returning from {@code execute()}. However, the behavior does diverge if locks 52 * are held when calling the context. So it is encouraged to avoid mixing locks and synchronization 53 * context except via {@link #executeLater}. 54 * 55 * <p>This class is thread-safe. 56 * 57 * @since 1.17.0 58 */ 59 @ThreadSafe 60 public final class SynchronizationContext implements Executor { 61 private final UncaughtExceptionHandler uncaughtExceptionHandler; 62 63 private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); 64 private final AtomicReference<Thread> drainingThread = new AtomicReference<>(); 65 66 /** 67 * Creates a SynchronizationContext. 68 * 69 * @param uncaughtExceptionHandler handles exceptions thrown out of the tasks. Different from 70 * what's documented on {@link UncaughtExceptionHandler#uncaughtException}, the thread is 71 * not terminated when the handler is called. 72 */ SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler)73 public SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler) { 74 this.uncaughtExceptionHandler = 75 checkNotNull(uncaughtExceptionHandler, "uncaughtExceptionHandler"); 76 } 77 78 /** 79 * Run all tasks in the queue in the current thread, if no other thread is running this method. 80 * Otherwise do nothing. 81 * 82 * <p>Upon returning, it guarantees that all tasks submitted by {@code #executeLater} before it 83 * have been or will eventually be run, while not requiring any more calls to {@code drain()}. 84 */ drain()85 public final void drain() { 86 do { 87 if (!drainingThread.compareAndSet(null, Thread.currentThread())) { 88 return; 89 } 90 try { 91 Runnable runnable; 92 while ((runnable = queue.poll()) != null) { 93 try { 94 runnable.run(); 95 } catch (Throwable t) { 96 uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t); 97 } 98 } 99 } finally { 100 drainingThread.set(null); 101 } 102 // must check queue again here to catch any added prior to clearing drainingThread 103 } while (!queue.isEmpty()); 104 } 105 106 /** 107 * Adds a task that will be run when {@link #drain} is called. 108 * 109 * <p>This is useful for cases where you want to enqueue a task while under a lock of your own, 110 * but don't want the tasks to be run under your lock (for fear of deadlock). You can call {@link 111 * #executeLater} in the lock, and call {@link #drain} outside the lock. 112 */ executeLater(Runnable runnable)113 public final void executeLater(Runnable runnable) { 114 queue.add(checkNotNull(runnable, "runnable is null")); 115 } 116 117 /** 118 * Adds a task and run it in this synchronization context as soon as possible. The task may run 119 * inline. If there are tasks that are previously queued by {@link #executeLater} but have not 120 * been run, this method will trigger them to be run before the given task. This is equivalent to 121 * calling {@link #executeLater} immediately followed by {@link #drain}. 122 */ 123 @Override execute(Runnable task)124 public final void execute(Runnable task) { 125 executeLater(task); 126 drain(); 127 } 128 129 /** 130 * Throw {@link IllegalStateException} if this method is not called from this synchronization 131 * context. 132 */ throwIfNotInThisSynchronizationContext()133 public void throwIfNotInThisSynchronizationContext() { 134 checkState(Thread.currentThread() == drainingThread.get(), 135 "Not called from the SynchronizationContext"); 136 } 137 138 /** 139 * Schedules a task to be added and run via {@link #execute} after a delay. 140 * 141 * @param task the task being scheduled 142 * @param delay the delay 143 * @param unit the time unit for the delay 144 * @param timerService the {@code ScheduledExecutorService} that provides delayed execution 145 * 146 * @return an object for checking the status and/or cancel the scheduled task 147 */ schedule( final Runnable task, long delay, TimeUnit unit, ScheduledExecutorService timerService)148 public final ScheduledHandle schedule( 149 final Runnable task, long delay, TimeUnit unit, ScheduledExecutorService timerService) { 150 final ManagedRunnable runnable = new ManagedRunnable(task); 151 ScheduledFuture<?> future = timerService.schedule(new Runnable() { 152 @Override 153 public void run() { 154 execute(runnable); 155 } 156 157 @Override 158 public String toString() { 159 return task.toString() + "(scheduled in SynchronizationContext)"; 160 } 161 }, delay, unit); 162 return new ScheduledHandle(runnable, future); 163 } 164 165 /** 166 * Schedules a task to be added and run via {@link #execute} after an inital delay and then 167 * repeated after the delay until cancelled. 168 * 169 * @param task the task being scheduled 170 * @param initialDelay the delay before the first run 171 * @param delay the delay after the first run. 172 * @param unit the time unit for the delay 173 * @param timerService the {@code ScheduledExecutorService} that provides delayed execution 174 * 175 * @return an object for checking the status and/or cancel the scheduled task 176 */ scheduleWithFixedDelay( final Runnable task, long initialDelay, long delay, TimeUnit unit, ScheduledExecutorService timerService)177 public final ScheduledHandle scheduleWithFixedDelay( 178 final Runnable task, long initialDelay, long delay, TimeUnit unit, 179 ScheduledExecutorService timerService) { 180 final ManagedRunnable runnable = new ManagedRunnable(task); 181 ScheduledFuture<?> future = timerService.scheduleWithFixedDelay(new Runnable() { 182 @Override 183 public void run() { 184 execute(runnable); 185 } 186 187 @Override 188 public String toString() { 189 return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay 190 + ")"; 191 } 192 }, initialDelay, delay, unit); 193 return new ScheduledHandle(runnable, future); 194 } 195 196 197 private static class ManagedRunnable implements Runnable { 198 final Runnable task; 199 boolean isCancelled; 200 boolean hasStarted; 201 ManagedRunnable(Runnable task)202 ManagedRunnable(Runnable task) { 203 this.task = checkNotNull(task, "task"); 204 } 205 206 @Override run()207 public void run() { 208 // The task may have been cancelled after timerService calls SynchronizationContext.execute() 209 // but before the runnable is actually run. We must guarantee that the task will not be run 210 // in this case. 211 if (!isCancelled) { 212 hasStarted = true; 213 task.run(); 214 } 215 } 216 } 217 218 /** 219 * Allows the user to check the status and/or cancel a task scheduled by {@link #schedule}. 220 * 221 * <p>This class is NOT thread-safe. All methods must be run from the same {@link 222 * SynchronizationContext} as which the task was scheduled in. 223 */ 224 public static final class ScheduledHandle { 225 private final ManagedRunnable runnable; 226 private final ScheduledFuture<?> future; 227 ScheduledHandle(ManagedRunnable runnable, ScheduledFuture<?> future)228 private ScheduledHandle(ManagedRunnable runnable, ScheduledFuture<?> future) { 229 this.runnable = checkNotNull(runnable, "runnable"); 230 this.future = checkNotNull(future, "future"); 231 } 232 233 /** 234 * Cancel the task if it's still {@link #isPending pending}. 235 */ cancel()236 public void cancel() { 237 runnable.isCancelled = true; 238 future.cancel(false); 239 } 240 241 /** 242 * Returns {@code true} if the task will eventually run, meaning that it has neither started 243 * running nor been cancelled. 244 */ isPending()245 public boolean isPending() { 246 return !(runnable.hasStarted || runnable.isCancelled); 247 } 248 } 249 } 250