• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2007 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.eventbus;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.annotations.Beta;
22 
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Executor;
25 
26 /**
27  * An {@link EventBus} that takes the Executor of your choice and uses it to
28  * dispatch events, allowing dispatch to occur asynchronously.
29  *
30  * @author Cliff Biffle
31  * @since 10.0
32  */
33 @Beta
34 public class AsyncEventBus extends EventBus {
35   private final Executor executor;
36 
37   /** the queue of events is shared across all threads */
38   private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch =
39       new ConcurrentLinkedQueue<EventWithSubscriber>();
40 
41   /**
42    * Creates a new AsyncEventBus that will use {@code executor} to dispatch
43    * events.  Assigns {@code identifier} as the bus's name for logging purposes.
44    *
45    * @param identifier short name for the bus, for logging purposes.
46    * @param executor   Executor to use to dispatch events. It is the caller's
47    *        responsibility to shut down the executor after the last event has
48    *        been posted to this event bus.
49    */
AsyncEventBus(String identifier, Executor executor)50   public AsyncEventBus(String identifier, Executor executor) {
51     super(identifier);
52     this.executor = checkNotNull(executor);
53   }
54 
55   /**
56    * Creates a new AsyncEventBus that will use {@code executor} to dispatch
57    * events.
58    *
59    * @param executor Executor to use to dispatch events. It is the caller's
60    *        responsibility to shut down the executor after the last event has
61    *        been posted to this event bus.
62    * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
63    *    See {@link SubscriberExceptionHandler} for more information.
64    * @since 16.0
65    */
AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler)66   public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
67     super(subscriberExceptionHandler);
68     this.executor = checkNotNull(executor);
69   }
70 
71   /**
72    * Creates a new AsyncEventBus that will use {@code executor} to dispatch
73    * events.
74    *
75    * @param executor Executor to use to dispatch events. It is the caller's
76    *        responsibility to shut down the executor after the last event has
77    *        been posted to this event bus.
78    */
AsyncEventBus(Executor executor)79   public AsyncEventBus(Executor executor) {
80     super("default");
81     this.executor = checkNotNull(executor);
82   }
83 
84   @Override
enqueueEvent(Object event, EventSubscriber subscriber)85   void enqueueEvent(Object event, EventSubscriber subscriber) {
86     eventsToDispatch.offer(new EventWithSubscriber(event, subscriber));
87   }
88 
89   /**
90    * Dispatch {@code events} in the order they were posted, regardless of
91    * the posting thread.
92    */
93   @SuppressWarnings("deprecation") // only deprecated for external subclasses
94   @Override
dispatchQueuedEvents()95   protected void dispatchQueuedEvents() {
96     while (true) {
97       EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll();
98       if (eventWithSubscriber == null) {
99         break;
100       }
101 
102       dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
103     }
104   }
105 
106   /**
107    * Calls the {@link #executor} to dispatch {@code event} to {@code subscriber}.
108    */
109   @Override
dispatch(final Object event, final EventSubscriber subscriber)110   void dispatch(final Object event, final EventSubscriber subscriber) {
111     checkNotNull(event);
112     checkNotNull(subscriber);
113     executor.execute(
114         new Runnable() {
115           @Override
116           public void run() {
117             AsyncEventBus.super.dispatch(event, subscriber);
118           }
119         });
120   }
121 }
122