1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import com.google.common.base.Preconditions; 20 21 import java.util.LinkedList; 22 import java.util.Queue; 23 import java.util.concurrent.Executor; 24 import java.util.logging.Level; 25 import java.util.logging.Logger; 26 27 import javax.annotation.concurrent.GuardedBy; 28 29 /** 30 * Executor ensuring that all Runnables submitted are executed in order, 31 * using the provided Executor, and serially such that no two will ever 32 * be running at the same time. 33 * 34 * TODO(user): The tasks are given to the underlying executor as a single 35 * task, which means the semantics of the executor may be changed, e.g. the 36 * executor may have an afterExecute method that runs after every task 37 * 38 * TODO(user): What happens in case of shutdown or shutdownNow? Should 39 * TaskRunner check for interruption? 40 * 41 * TODO(user): It would be nice to provide a handle to individual task 42 * results using Future. Maybe SerializingExecutorService? 43 * 44 * @author JJ Furman 45 */ 46 final class SerializingExecutor implements Executor { 47 private static final Logger log = 48 Logger.getLogger(SerializingExecutor.class.getName()); 49 50 /** Underlying executor that all submitted Runnable objects are run on. */ 51 private final Executor executor; 52 53 /** A list of Runnables to be run in order. */ 54 @GuardedBy("internalLock") 55 private final Queue<Runnable> waitQueue = new LinkedList<Runnable>(); 56 57 /** 58 * We explicitly keep track of if the TaskRunner is currently scheduled to 59 * run. If it isn't, we start it. We can't just use 60 * waitQueue.isEmpty() as a proxy because we need to ensure that only one 61 * Runnable submitted is running at a time so even if waitQueue is empty 62 * the isThreadScheduled isn't set to false until after the Runnable is 63 * finished. 64 */ 65 @GuardedBy("internalLock") 66 private boolean isThreadScheduled = false; 67 68 /** The object that actually runs the Runnables submitted, reused. */ 69 private final TaskRunner taskRunner = new TaskRunner(); 70 71 /** 72 * Creates a SerializingExecutor, running tasks using {@code executor}. 73 * 74 * @param executor Executor in which tasks should be run. Must not be null. 75 */ SerializingExecutor(Executor executor)76 public SerializingExecutor(Executor executor) { 77 Preconditions.checkNotNull(executor, "'executor' must not be null."); 78 this.executor = executor; 79 } 80 81 private final Object internalLock = new Object() { 82 @Override public String toString() { 83 return "SerializingExecutor lock: " + super.toString(); 84 } 85 }; 86 87 /** 88 * Runs the given runnable strictly after all Runnables that were submitted 89 * before it, and using the {@code executor} passed to the constructor. . 90 */ 91 @Override execute(Runnable r)92 public void execute(Runnable r) { 93 Preconditions.checkNotNull(r, "'r' must not be null."); 94 boolean scheduleTaskRunner = false; 95 synchronized (internalLock) { 96 waitQueue.add(r); 97 98 if (!isThreadScheduled) { 99 isThreadScheduled = true; 100 scheduleTaskRunner = true; 101 } 102 } 103 if (scheduleTaskRunner) { 104 boolean threw = true; 105 try { 106 executor.execute(taskRunner); 107 threw = false; 108 } finally { 109 if (threw) { 110 synchronized (internalLock) { 111 // It is possible that at this point that there are still tasks in 112 // the queue, it would be nice to keep trying but the error may not 113 // be recoverable. So we update our state and propogate so that if 114 // our caller deems it recoverable we won't be stuck. 115 isThreadScheduled = false; 116 } 117 } 118 } 119 } 120 } 121 122 /** 123 * Task that actually runs the Runnables. It takes the Runnables off of the 124 * queue one by one and runs them. After it is done with all Runnables and 125 * there are no more to run, puts the SerializingExecutor in the state where 126 * isThreadScheduled = false and returns. This allows the current worker 127 * thread to return to the original pool. 128 */ 129 private class TaskRunner implements Runnable { 130 @Override run()131 public void run() { 132 boolean stillRunning = true; 133 try { 134 while (true) { 135 Preconditions.checkState(isThreadScheduled); 136 Runnable nextToRun; 137 synchronized (internalLock) { 138 nextToRun = waitQueue.poll(); 139 if (nextToRun == null) { 140 isThreadScheduled = false; 141 stillRunning = false; 142 break; 143 } 144 } 145 146 // Always run while not holding the lock, to avoid deadlocks. 147 try { 148 nextToRun.run(); 149 } catch (RuntimeException e) { 150 // Log it and keep going. 151 log.log(Level.SEVERE, "Exception while executing runnable " 152 + nextToRun, e); 153 } 154 } 155 } finally { 156 if (stillRunning) { 157 // An Error is bubbling up, we should mark ourselves as no longer 158 // running, that way if anyone tries to keep using us we won't be 159 // corrupted. 160 synchronized (internalLock) { 161 isThreadScheduled = false; 162 } 163 } 164 } 165 } 166 } 167 } 168