• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2008 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import com.google.common.base.Preconditions;
20 
21 import java.util.LinkedList;
22 import java.util.Queue;
23 import java.util.concurrent.Executor;
24 import java.util.logging.Level;
25 import java.util.logging.Logger;
26 
27 import javax.annotation.concurrent.GuardedBy;
28 
29 /**
30  * Executor ensuring that all Runnables submitted are executed in order,
31  * using the provided Executor, and serially such that no two will ever
32  * be running at the same time.
33  *
34  * TODO(user): The tasks are given to the underlying executor as a single
35  * task, which means the semantics of the executor may be changed, e.g. the
36  * executor may have an afterExecute method that runs after every task
37  *
38  * TODO(user): What happens in case of shutdown or shutdownNow?  Should
39  * TaskRunner check for interruption?
40  *
41  * TODO(user): It would be nice to provide a handle to individual task
42  * results using Future.  Maybe SerializingExecutorService?
43  *
44  * @author JJ Furman
45  */
46 final class SerializingExecutor implements Executor {
47   private static final Logger log =
48       Logger.getLogger(SerializingExecutor.class.getName());
49 
50   /** Underlying executor that all submitted Runnable objects are run on. */
51   private final Executor executor;
52 
53   /** A list of Runnables to be run in order. */
54   @GuardedBy("internalLock")
55   private final Queue<Runnable> waitQueue = new LinkedList<Runnable>();
56 
57   /**
58    * We explicitly keep track of if the TaskRunner is currently scheduled to
59    * run.  If it isn't, we start it.  We can't just use
60    * waitQueue.isEmpty() as a proxy because we need to ensure that only one
61    * Runnable submitted is running at a time so even if waitQueue is empty
62    * the isThreadScheduled isn't set to false until after the Runnable is
63    * finished.
64    */
65   @GuardedBy("internalLock")
66   private boolean isThreadScheduled = false;
67 
68   /** The object that actually runs the Runnables submitted, reused. */
69   private final TaskRunner taskRunner = new TaskRunner();
70 
71   /**
72    * Creates a SerializingExecutor, running tasks using {@code executor}.
73    *
74    * @param executor Executor in which tasks should be run. Must not be null.
75    */
SerializingExecutor(Executor executor)76   public SerializingExecutor(Executor executor) {
77     Preconditions.checkNotNull(executor, "'executor' must not be null.");
78     this.executor = executor;
79   }
80 
81   private final Object internalLock = new Object() {
82     @Override public String toString() {
83       return "SerializingExecutor lock: " + super.toString();
84     }
85   };
86 
87   /**
88    * Runs the given runnable strictly after all Runnables that were submitted
89    * before it, and using the {@code executor} passed to the constructor.     .
90    */
91   @Override
execute(Runnable r)92   public void execute(Runnable r) {
93     Preconditions.checkNotNull(r, "'r' must not be null.");
94     boolean scheduleTaskRunner = false;
95     synchronized (internalLock) {
96       waitQueue.add(r);
97 
98       if (!isThreadScheduled) {
99         isThreadScheduled = true;
100         scheduleTaskRunner = true;
101       }
102     }
103     if (scheduleTaskRunner) {
104       boolean threw = true;
105       try {
106         executor.execute(taskRunner);
107         threw = false;
108       } finally {
109         if (threw) {
110           synchronized (internalLock) {
111             // It is possible that at this point that there are still tasks in
112             // the queue, it would be nice to keep trying but the error may not
113             // be recoverable.  So we update our state and propogate so that if
114             // our caller deems it recoverable we won't be stuck.
115             isThreadScheduled = false;
116           }
117         }
118       }
119     }
120   }
121 
122   /**
123    * Task that actually runs the Runnables.  It takes the Runnables off of the
124    * queue one by one and runs them.  After it is done with all Runnables and
125    * there are no more to run, puts the SerializingExecutor in the state where
126    * isThreadScheduled = false and returns.  This allows the current worker
127    * thread to return to the original pool.
128    */
129   private class TaskRunner implements Runnable {
130     @Override
run()131     public void run() {
132       boolean stillRunning = true;
133       try {
134         while (true) {
135           Preconditions.checkState(isThreadScheduled);
136           Runnable nextToRun;
137           synchronized (internalLock) {
138             nextToRun = waitQueue.poll();
139             if (nextToRun == null) {
140               isThreadScheduled = false;
141               stillRunning = false;
142               break;
143             }
144           }
145 
146           // Always run while not holding the lock, to avoid deadlocks.
147           try {
148             nextToRun.run();
149           } catch (RuntimeException e) {
150             // Log it and keep going.
151             log.log(Level.SEVERE, "Exception while executing runnable "
152                 + nextToRun, e);
153           }
154         }
155       } finally {
156         if (stillRunning) {
157           // An Error is bubbling up, we should mark ourselves as no longer
158           // running, that way if anyone tries to keep using us we won't be
159           // corrupted.
160           synchronized (internalLock) {
161             isThreadScheduled = false;
162           }
163         }
164       }
165     }
166   }
167 }
168