• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.base.Preconditions;
22 import com.google.common.collect.Queues;
23 
24 import java.util.Queue;
25 import java.util.concurrent.Executor;
26 import java.util.logging.Level;
27 import java.util.logging.Logger;
28 
29 import javax.annotation.concurrent.GuardedBy;
30 
31 /**
32  * A special purpose queue/executor that executes listener callbacks serially on a configured
33  * executor.  Each callback task can be enqueued and executed as separate phases.
34  *
35  * <p>This class is very similar to {@link SerializingExecutor} with the exception that tasks can
36  * be enqueued without necessarily executing immediately.
37  */
38 final class ListenerCallQueue<L> implements Runnable {
39   // TODO(cpovirk): consider using the logger associated with listener.getClass().
40   private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName());
41 
42   abstract static class Callback<L> {
43     private final String methodCall;
44 
Callback(String methodCall)45     Callback(String methodCall) {
46       this.methodCall = methodCall;
47     }
48 
call(L listener)49     abstract void call(L listener);
50 
51     /** Helper method to add this callback to all the queues. */
enqueueOn(Iterable<ListenerCallQueue<L>> queues)52     void enqueueOn(Iterable<ListenerCallQueue<L>> queues) {
53       for (ListenerCallQueue<L> queue : queues) {
54         queue.add(this);
55       }
56     }
57   }
58 
59   private final L listener;
60   private final Executor executor;
61 
62   @GuardedBy("this") private final Queue<Callback<L>> waitQueue = Queues.newArrayDeque();
63   @GuardedBy("this") private boolean isThreadScheduled;
64 
ListenerCallQueue(L listener, Executor executor)65   ListenerCallQueue(L listener, Executor executor) {
66     this.listener = checkNotNull(listener);
67     this.executor = checkNotNull(executor);
68   }
69 
70   /** Enqueues a task to be run. */
add(Callback<L> callback)71   synchronized void add(Callback<L> callback) {
72     waitQueue.add(callback);
73   }
74 
75   /** Executes all listeners {@linkplain #add added} prior to this call, serially and in order.*/
execute()76   void execute() {
77     boolean scheduleTaskRunner = false;
78     synchronized (this) {
79       if (!isThreadScheduled) {
80         isThreadScheduled = true;
81         scheduleTaskRunner = true;
82       }
83     }
84     if (scheduleTaskRunner) {
85       try {
86         executor.execute(this);
87       } catch (RuntimeException e) {
88         // reset state in case of an error so that later calls to execute will actually do something
89         synchronized (this) {
90           isThreadScheduled = false;
91         }
92         // Log it and keep going.
93         logger.log(Level.SEVERE,
94             "Exception while running callbacks for " + listener + " on " + executor,
95             e);
96         throw e;
97       }
98     }
99   }
100 
run()101   @Override public void run() {
102     boolean stillRunning = true;
103     try {
104       while (true) {
105         Callback<L> nextToRun;
106         synchronized (ListenerCallQueue.this) {
107           Preconditions.checkState(isThreadScheduled);
108           nextToRun = waitQueue.poll();
109           if (nextToRun == null) {
110             isThreadScheduled = false;
111             stillRunning = false;
112             break;
113           }
114         }
115 
116         // Always run while _not_ holding the lock, to avoid deadlocks.
117         try {
118           nextToRun.call(listener);
119         } catch (RuntimeException e) {
120           // Log it and keep going.
121           logger.log(Level.SEVERE,
122               "Exception while executing callback: " + listener + "." + nextToRun.methodCall,
123               e);
124         }
125       }
126     } finally {
127       if (stillRunning) {
128         // An Error is bubbling up, we should mark ourselves as no longer
129         // running, that way if anyone tries to keep using us we won't be
130         // corrupted.
131         synchronized (ListenerCallQueue.this) {
132           isThreadScheduled = false;
133         }
134       }
135     }
136   }
137 }
138