1 /* 2 * Copyright 2017, OpenCensus Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.opencensus.impl.internal; 18 19 import com.lmax.disruptor.EventFactory; 20 import com.lmax.disruptor.EventHandler; 21 import com.lmax.disruptor.RingBuffer; 22 import com.lmax.disruptor.SleepingWaitStrategy; 23 import com.lmax.disruptor.dsl.Disruptor; 24 import com.lmax.disruptor.dsl.ProducerType; 25 import io.opencensus.implcore.internal.DaemonThreadFactory; 26 import io.opencensus.implcore.internal.EventQueue; 27 import java.util.concurrent.atomic.AtomicBoolean; 28 import java.util.logging.Level; 29 import java.util.logging.Logger; 30 import javax.annotation.Nullable; 31 import javax.annotation.concurrent.ThreadSafe; 32 33 /** 34 * A low-latency event queue for background updating of (possibly contended) objects. This is 35 * intended for use by instrumentation methods to ensure that they do not block foreground 36 * activities. To customize the action taken on reading the queue, derive a new class from {@link 37 * EventQueue.Entry} and pass it to the {@link #enqueue(Entry)} method. The {@link Entry#process()} 38 * method of your class will be called and executed in a background thread. This class is a 39 * Singleton. 40 * 41 * <p>Example Usage: Given a class as follows: 42 * 43 * <pre> 44 * public class someClass { 45 * public void doSomething() { 46 * // Do the work of the method. One result is a measurement of something. 47 * int measurement = doSomeWork(); 48 * // Make an update to the class state, based on this measurement. This work can take some 49 * // time, but can be done asynchronously, in the background. 50 * update(measurement); 51 * } 52 * 53 * public void update(int arg) { 54 * // do something 55 * } 56 * } 57 * </pre> 58 * 59 * <p>The work of calling {@code someClass.update()} can be executed in the backgound as follows: 60 * 61 * <pre> 62 * public class someClass { 63 * // Add a EventQueueEntry class that will process the update call. 64 * private static final class SomeClassUpdateEvent implements EventQueueEntry { 65 * private final SomeClass someClassInstance; 66 * private final int arg; 67 * 68 * SomeObjectUpdateEvent(SomeObject someClassInstance, int arg) { 69 * this.someClassInstance = someClassInstance; 70 * this.arg = arg; 71 * } 72 * 73 * @Override 74 * public void process() { 75 * someClassInstance.update(arg); 76 * } 77 * } 78 * 79 * public void doSomething() { 80 * int measurement = doSomeWork(); 81 * // Instead of calling update() directly, create an event to do the processing, and insert 82 * // it into the EventQueue. It will be processed in a background thread, and doSomething() 83 * // can return immediately. 84 * EventQueue.getInstance.enqueue(new SomeClassUpdateEvent(this, measurement)); 85 * } 86 * } 87 * </pre> 88 */ 89 @ThreadSafe 90 public final class DisruptorEventQueue implements EventQueue { 91 92 private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName()); 93 94 // Number of events that can be enqueued at any one time. If more than this are enqueued, 95 // then subsequent attempts to enqueue new entries will block. 96 // TODO(aveitch): consider making this a parameter to the constructor, so the queue can be 97 // configured to a size appropriate to the system (smaller/less busy systems will not need as 98 // large a queue. 99 private static final int DISRUPTOR_BUFFER_SIZE = 8192; 100 // The single instance of the class. 101 private static final DisruptorEventQueue eventQueue = create(); 102 103 // The event queue is built on this {@link Disruptor}. 104 private final Disruptor<DisruptorEvent> disruptor; 105 // Ring Buffer for the {@link Disruptor} that underlies the queue. 106 private final RingBuffer<DisruptorEvent> ringBuffer; 107 108 private volatile DisruptorEnqueuer enqueuer; 109 110 // Creates a new EventQueue. Private to prevent creation of non-singleton instance. DisruptorEventQueue( Disruptor<DisruptorEvent> disruptor, RingBuffer<DisruptorEvent> ringBuffer, DisruptorEnqueuer enqueuer)111 private DisruptorEventQueue( 112 Disruptor<DisruptorEvent> disruptor, 113 RingBuffer<DisruptorEvent> ringBuffer, 114 DisruptorEnqueuer enqueuer) { 115 this.disruptor = disruptor; 116 this.ringBuffer = ringBuffer; 117 this.enqueuer = enqueuer; 118 } 119 120 // Creates a new EventQueue. Private to prevent creation of non-singleton instance. create()121 private static DisruptorEventQueue create() { 122 // Create new Disruptor for processing. Note that Disruptor creates a single thread per 123 // consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details); 124 // this ensures that the event handler can take unsynchronized actions whenever possible. 125 Disruptor<DisruptorEvent> disruptor = 126 new Disruptor<>( 127 DisruptorEventFactory.INSTANCE, 128 DISRUPTOR_BUFFER_SIZE, 129 new DaemonThreadFactory("OpenCensus.Disruptor"), 130 ProducerType.MULTI, 131 new SleepingWaitStrategy()); 132 disruptor.handleEventsWith(new DisruptorEventHandler[] {DisruptorEventHandler.INSTANCE}); 133 disruptor.start(); 134 final RingBuffer<DisruptorEvent> ringBuffer = disruptor.getRingBuffer(); 135 136 DisruptorEnqueuer enqueuer = 137 new DisruptorEnqueuer() { 138 @Override 139 public void enqueue(Entry entry) { 140 long sequence = ringBuffer.next(); 141 try { 142 DisruptorEvent event = ringBuffer.get(sequence); 143 event.setEntry(entry); 144 } finally { 145 ringBuffer.publish(sequence); 146 } 147 } 148 }; 149 return new DisruptorEventQueue(disruptor, ringBuffer, enqueuer); 150 } 151 152 /** 153 * Returns the {@link DisruptorEventQueue} instance. 154 * 155 * @return the singleton {@code EventQueue} instance. 156 */ getInstance()157 public static DisruptorEventQueue getInstance() { 158 return eventQueue; 159 } 160 161 /** 162 * Enqueues an event on the {@link DisruptorEventQueue}. 163 * 164 * @param entry a class encapsulating the actions to be taken for event processing. 165 */ 166 @Override enqueue(Entry entry)167 public void enqueue(Entry entry) { 168 enqueuer.enqueue(entry); 169 } 170 171 /** Shuts down the underlying disruptor. */ 172 @Override shutdown()173 public void shutdown() { 174 enqueuer = 175 new DisruptorEnqueuer() { 176 final AtomicBoolean logged = new AtomicBoolean(false); 177 178 @Override 179 public void enqueue(Entry entry) { 180 if (!logged.getAndSet(true)) { 181 logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown."); 182 } 183 } 184 }; 185 186 disruptor.shutdown(); 187 } 188 189 // Allows this event queue to safely shutdown by not enqueuing events on the ring buffer 190 private abstract static class DisruptorEnqueuer { 191 enqueue(Entry entry)192 public abstract void enqueue(Entry entry); 193 } 194 195 // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry. 196 private static final class DisruptorEvent { 197 198 // TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so 199 // intuitively this variable must be volatile. 200 @Nullable private volatile Entry entry = null; 201 202 // Sets the EventQueueEntry associated with this DisruptorEvent. setEntry(@ullable Entry entry)203 void setEntry(@Nullable Entry entry) { 204 this.entry = entry; 205 } 206 207 // Returns the EventQueueEntry associated with this DisruptorEvent. 208 @Nullable getEntry()209 Entry getEntry() { 210 return entry; 211 } 212 } 213 214 // Factory for DisruptorEvent. 215 private enum DisruptorEventFactory implements EventFactory<DisruptorEvent> { 216 INSTANCE; 217 218 @Override newInstance()219 public DisruptorEvent newInstance() { 220 return new DisruptorEvent(); 221 } 222 } 223 224 /** 225 * Every event that gets added to {@link EventQueue} will get processed here. Just calls the 226 * underlying process() method. 227 */ 228 private enum DisruptorEventHandler implements EventHandler<DisruptorEvent> { 229 INSTANCE; 230 231 @Override onEvent(DisruptorEvent event, long sequence, boolean endOfBatch)232 public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) { 233 Entry entry = event.getEntry(); 234 if (entry != null) { 235 entry.process(); 236 } 237 // Remove the reference to the previous entry to allow the memory to be gc'ed. 238 event.setEntry(null); 239 } 240 } 241 } 242