/* * Copyright 2018 The gRPC Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.grpc; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import java.lang.Thread.UncaughtExceptionHandler; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.ThreadSafe; /** * A synchronization context is a queue of tasks that run in sequence. It offers following * guarantees: * *
It doesn't own any thread. Tasks are run from caller's or caller-provided threads. * *
Conceptually, it is fairly accurate to think of {@code SynchronizationContext} like a cheaper * {@code Executors.newSingleThreadExecutor()} when used for synchronization (not long-running * tasks). Both use a queue for tasks that are run in order and neither guarantee that tasks have * completed before returning from {@code execute()}. However, the behavior does diverge if locks * are held when calling the context. So it is encouraged to avoid mixing locks and synchronization * context except via {@link #executeLater}. * *
This class is thread-safe.
*
* @since 1.17.0
*/
@ThreadSafe
public final class SynchronizationContext implements Executor {
private final UncaughtExceptionHandler uncaughtExceptionHandler;
private final Queue Upon returning, it guarantees that all tasks submitted by {@code #executeLater} before it
* have been or will eventually be run, while not requiring any more calls to {@code drain()}.
*/
public final void drain() {
do {
if (!drainingThread.compareAndSet(null, Thread.currentThread())) {
return;
}
try {
Runnable runnable;
while ((runnable = queue.poll()) != null) {
try {
runnable.run();
} catch (Throwable t) {
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t);
}
}
} finally {
drainingThread.set(null);
}
// must check queue again here to catch any added prior to clearing drainingThread
} while (!queue.isEmpty());
}
/**
* Adds a task that will be run when {@link #drain} is called.
*
* This is useful for cases where you want to enqueue a task while under a lock of your own,
* but don't want the tasks to be run under your lock (for fear of deadlock). You can call {@link
* #executeLater} in the lock, and call {@link #drain} outside the lock.
*/
public final void executeLater(Runnable runnable) {
queue.add(checkNotNull(runnable, "runnable is null"));
}
/**
* Adds a task and run it in this synchronization context as soon as possible. The task may run
* inline. If there are tasks that are previously queued by {@link #executeLater} but have not
* been run, this method will trigger them to be run before the given task. This is equivalent to
* calling {@link #executeLater} immediately followed by {@link #drain}.
*/
@Override
public final void execute(Runnable task) {
executeLater(task);
drain();
}
/**
* Throw {@link IllegalStateException} if this method is not called from this synchronization
* context.
*/
public void throwIfNotInThisSynchronizationContext() {
checkState(Thread.currentThread() == drainingThread.get(),
"Not called from the SynchronizationContext");
}
/**
* Schedules a task to be added and run via {@link #execute} after a delay.
*
* @param task the task being scheduled
* @param delay the delay
* @param unit the time unit for the delay
* @param timerService the {@code ScheduledExecutorService} that provides delayed execution
*
* @return an object for checking the status and/or cancel the scheduled task
*/
public final ScheduledHandle schedule(
final Runnable task, long delay, TimeUnit unit, ScheduledExecutorService timerService) {
final ManagedRunnable runnable = new ManagedRunnable(task);
ScheduledFuture> future = timerService.schedule(new Runnable() {
@Override
public void run() {
execute(runnable);
}
@Override
public String toString() {
return task.toString() + "(scheduled in SynchronizationContext)";
}
}, delay, unit);
return new ScheduledHandle(runnable, future);
}
/**
* Schedules a task to be added and run via {@link #execute} after an inital delay and then
* repeated after the delay until cancelled.
*
* @param task the task being scheduled
* @param initialDelay the delay before the first run
* @param delay the delay after the first run.
* @param unit the time unit for the delay
* @param timerService the {@code ScheduledExecutorService} that provides delayed execution
*
* @return an object for checking the status and/or cancel the scheduled task
*/
public final ScheduledHandle scheduleWithFixedDelay(
final Runnable task, long initialDelay, long delay, TimeUnit unit,
ScheduledExecutorService timerService) {
final ManagedRunnable runnable = new ManagedRunnable(task);
ScheduledFuture> future = timerService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
execute(runnable);
}
@Override
public String toString() {
return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay
+ ")";
}
}, initialDelay, delay, unit);
return new ScheduledHandle(runnable, future);
}
private static class ManagedRunnable implements Runnable {
final Runnable task;
boolean isCancelled;
boolean hasStarted;
ManagedRunnable(Runnable task) {
this.task = checkNotNull(task, "task");
}
@Override
public void run() {
// The task may have been cancelled after timerService calls SynchronizationContext.execute()
// but before the runnable is actually run. We must guarantee that the task will not be run
// in this case.
if (!isCancelled) {
hasStarted = true;
task.run();
}
}
}
/**
* Allows the user to check the status and/or cancel a task scheduled by {@link #schedule}.
*
* This class is NOT thread-safe. All methods must be run from the same {@link
* SynchronizationContext} as which the task was scheduled in.
*/
public static final class ScheduledHandle {
private final ManagedRunnable runnable;
private final ScheduledFuture> future;
private ScheduledHandle(ManagedRunnable runnable, ScheduledFuture> future) {
this.runnable = checkNotNull(runnable, "runnable");
this.future = checkNotNull(future, "future");
}
/**
* Cancel the task if it's still {@link #isPending pending}.
*/
public void cancel() {
runnable.isCancelled = true;
future.cancel(false);
}
/**
* Returns {@code true} if the task will eventually run, meaning that it has neither started
* running nor been cancelled.
*/
public boolean isPending() {
return !(runnable.hasStarted || runnable.isCancelled);
}
}
}