1 /* 2 * Copyright (C) 2007 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.eventbus; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.common.annotations.Beta; 22 23 import java.util.concurrent.ConcurrentLinkedQueue; 24 import java.util.concurrent.Executor; 25 26 /** 27 * An {@link EventBus} that takes the Executor of your choice and uses it to 28 * dispatch events, allowing dispatch to occur asynchronously. 29 * 30 * @author Cliff Biffle 31 * @since 10.0 32 */ 33 @Beta 34 public class AsyncEventBus extends EventBus { 35 private final Executor executor; 36 37 /** the queue of events is shared across all threads */ 38 private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch = 39 new ConcurrentLinkedQueue<EventWithSubscriber>(); 40 41 /** 42 * Creates a new AsyncEventBus that will use {@code executor} to dispatch 43 * events. Assigns {@code identifier} as the bus's name for logging purposes. 44 * 45 * @param identifier short name for the bus, for logging purposes. 46 * @param executor Executor to use to dispatch events. It is the caller's 47 * responsibility to shut down the executor after the last event has 48 * been posted to this event bus. 49 */ AsyncEventBus(String identifier, Executor executor)50 public AsyncEventBus(String identifier, Executor executor) { 51 super(identifier); 52 this.executor = checkNotNull(executor); 53 } 54 55 /** 56 * Creates a new AsyncEventBus that will use {@code executor} to dispatch 57 * events. 58 * 59 * @param executor Executor to use to dispatch events. It is the caller's 60 * responsibility to shut down the executor after the last event has 61 * been posted to this event bus. 62 * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers. 63 * See {@link SubscriberExceptionHandler} for more information. 64 * @since 16.0 65 */ AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler)66 public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) { 67 super(subscriberExceptionHandler); 68 this.executor = checkNotNull(executor); 69 } 70 71 /** 72 * Creates a new AsyncEventBus that will use {@code executor} to dispatch 73 * events. 74 * 75 * @param executor Executor to use to dispatch events. It is the caller's 76 * responsibility to shut down the executor after the last event has 77 * been posted to this event bus. 78 */ AsyncEventBus(Executor executor)79 public AsyncEventBus(Executor executor) { 80 super("default"); 81 this.executor = checkNotNull(executor); 82 } 83 84 @Override enqueueEvent(Object event, EventSubscriber subscriber)85 void enqueueEvent(Object event, EventSubscriber subscriber) { 86 eventsToDispatch.offer(new EventWithSubscriber(event, subscriber)); 87 } 88 89 /** 90 * Dispatch {@code events} in the order they were posted, regardless of 91 * the posting thread. 92 */ 93 @SuppressWarnings("deprecation") // only deprecated for external subclasses 94 @Override dispatchQueuedEvents()95 protected void dispatchQueuedEvents() { 96 while (true) { 97 EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll(); 98 if (eventWithSubscriber == null) { 99 break; 100 } 101 102 dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber); 103 } 104 } 105 106 /** 107 * Calls the {@link #executor} to dispatch {@code event} to {@code subscriber}. 108 */ 109 @Override dispatch(final Object event, final EventSubscriber subscriber)110 void dispatch(final Object event, final EventSubscriber subscriber) { 111 checkNotNull(event); 112 checkNotNull(subscriber); 113 executor.execute( 114 new Runnable() { 115 @Override 116 public void run() { 117 AsyncEventBus.super.dispatch(event, subscriber); 118 } 119 }); 120 } 121 } 122