1 /* 2 * Copyright 2017, OpenCensus Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.opencensus.impl.internal; 18 19 import com.lmax.disruptor.EventFactory; 20 import com.lmax.disruptor.EventHandler; 21 import com.lmax.disruptor.RingBuffer; 22 import com.lmax.disruptor.SleepingWaitStrategy; 23 import com.lmax.disruptor.dsl.Disruptor; 24 import com.lmax.disruptor.dsl.ProducerType; 25 import io.opencensus.implcore.internal.DaemonThreadFactory; 26 import io.opencensus.implcore.internal.EventQueue; 27 import java.util.concurrent.atomic.AtomicBoolean; 28 import java.util.logging.Level; 29 import java.util.logging.Logger; 30 import javax.annotation.Nullable; 31 import javax.annotation.concurrent.ThreadSafe; 32 33 /** 34 * A low-latency event queue for background updating of (possibly contended) objects. This is 35 * intended for use by instrumentation methods to ensure that they do not block foreground 36 * activities. To customize the action taken on reading the queue, derive a new class from {@link 37 * EventQueue.Entry} and pass it to the {@link #enqueue(Entry)} method. The {@link Entry#process()} 38 * method of your class will be called and executed in a background thread. This class is a 39 * Singleton. 40 * 41 * <p>Example Usage: Given a class as follows: 42 * 43 * <pre> 44 * public class someClass { 45 * public void doSomething() { 46 * // Do the work of the method. One result is a measurement of something. 47 * int measurement = doSomeWork(); 48 * // Make an update to the class state, based on this measurement. This work can take some 49 * // time, but can be done asynchronously, in the background. 50 * update(measurement); 51 * } 52 * 53 * public void update(int arg) { 54 * // do something 55 * } 56 * } 57 * </pre> 58 * 59 * <p>The work of calling {@code someClass.update()} can be executed in the backgound as follows: 60 * 61 * <pre> 62 * public class someClass { 63 * // Add a EventQueueEntry class that will process the update call. 64 * private static final class SomeClassUpdateEvent implements EventQueueEntry { 65 * private final SomeClass someClassInstance; 66 * private final int arg; 67 * 68 * SomeObjectUpdateEvent(SomeObject someClassInstance, int arg) { 69 * this.someClassInstance = someClassInstance; 70 * this.arg = arg; 71 * } 72 * 73 * @Override 74 * public void process() { 75 * someClassInstance.update(arg); 76 * } 77 * } 78 * 79 * public void doSomething() { 80 * int measurement = doSomeWork(); 81 * // Instead of calling update() directly, create an event to do the processing, and insert 82 * // it into the EventQueue. It will be processed in a background thread, and doSomething() 83 * // can return immediately. 84 * EventQueue.getInstance.enqueue(new SomeClassUpdateEvent(this, measurement)); 85 * } 86 * } 87 * </pre> 88 */ 89 @ThreadSafe 90 public final class DisruptorEventQueue implements EventQueue { 91 92 private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName()); 93 94 // Number of events that can be enqueued at any one time. If more than this are enqueued, 95 // then subsequent attempts to enqueue new entries will block. 96 // TODO(aveitch): consider making this a parameter to the constructor, so the queue can be 97 // configured to a size appropriate to the system (smaller/less busy systems will not need as 98 // large a queue. 99 private static final int DISRUPTOR_BUFFER_SIZE = 8192; 100 // The single instance of the class. 101 private static final DisruptorEventQueue eventQueue = create(); 102 103 // The event queue is built on this {@link Disruptor}. 104 private final Disruptor<DisruptorEvent> disruptor; 105 106 private volatile DisruptorEnqueuer enqueuer; 107 108 // Creates a new EventQueue. Private to prevent creation of non-singleton instance. DisruptorEventQueue(Disruptor<DisruptorEvent> disruptor, DisruptorEnqueuer enqueuer)109 private DisruptorEventQueue(Disruptor<DisruptorEvent> disruptor, DisruptorEnqueuer enqueuer) { 110 this.disruptor = disruptor; 111 this.enqueuer = enqueuer; 112 } 113 114 // Creates a new EventQueue. Private to prevent creation of non-singleton instance. create()115 private static DisruptorEventQueue create() { 116 // Create new Disruptor for processing. Note that Disruptor creates a single thread per 117 // consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details); 118 // this ensures that the event handler can take unsynchronized actions whenever possible. 119 Disruptor<DisruptorEvent> disruptor = 120 new Disruptor<>( 121 DisruptorEventFactory.INSTANCE, 122 DISRUPTOR_BUFFER_SIZE, 123 new DaemonThreadFactory("OpenCensus.Disruptor"), 124 ProducerType.MULTI, 125 new SleepingWaitStrategy(0, 1000 * 1000)); 126 disruptor.handleEventsWith(new DisruptorEventHandler[] {DisruptorEventHandler.INSTANCE}); 127 disruptor.start(); 128 final RingBuffer<DisruptorEvent> ringBuffer = disruptor.getRingBuffer(); 129 130 DisruptorEnqueuer enqueuer = 131 new DisruptorEnqueuer() { 132 @Override 133 public void enqueue(Entry entry) { 134 long sequence = ringBuffer.next(); 135 try { 136 DisruptorEvent event = ringBuffer.get(sequence); 137 event.setEntry(entry); 138 } finally { 139 ringBuffer.publish(sequence); 140 } 141 } 142 }; 143 return new DisruptorEventQueue(disruptor, enqueuer); 144 } 145 146 /** 147 * Returns the {@link DisruptorEventQueue} instance. 148 * 149 * @return the singleton {@code EventQueue} instance. 150 */ getInstance()151 public static DisruptorEventQueue getInstance() { 152 return eventQueue; 153 } 154 155 /** 156 * Enqueues an event on the {@link DisruptorEventQueue}. 157 * 158 * @param entry a class encapsulating the actions to be taken for event processing. 159 */ 160 @Override enqueue(Entry entry)161 public void enqueue(Entry entry) { 162 enqueuer.enqueue(entry); 163 } 164 165 /** Shuts down the underlying disruptor. */ 166 @Override shutdown()167 public void shutdown() { 168 enqueuer = 169 new DisruptorEnqueuer() { 170 final AtomicBoolean logged = new AtomicBoolean(false); 171 172 @Override 173 public void enqueue(Entry entry) { 174 if (!logged.getAndSet(true)) { 175 logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown."); 176 } 177 } 178 }; 179 180 disruptor.shutdown(); 181 } 182 183 // Allows this event queue to safely shutdown by not enqueuing events on the ring buffer 184 private abstract static class DisruptorEnqueuer { 185 enqueue(Entry entry)186 public abstract void enqueue(Entry entry); 187 } 188 189 // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry. 190 private static final class DisruptorEvent { 191 192 // TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so 193 // intuitively this variable must be volatile. 194 @Nullable private volatile Entry entry = null; 195 196 // Sets the EventQueueEntry associated with this DisruptorEvent. setEntry(@ullable Entry entry)197 void setEntry(@Nullable Entry entry) { 198 this.entry = entry; 199 } 200 201 // Returns the EventQueueEntry associated with this DisruptorEvent. 202 @Nullable getEntry()203 Entry getEntry() { 204 return entry; 205 } 206 } 207 208 // Factory for DisruptorEvent. 209 private enum DisruptorEventFactory implements EventFactory<DisruptorEvent> { 210 INSTANCE; 211 212 @Override newInstance()213 public DisruptorEvent newInstance() { 214 return new DisruptorEvent(); 215 } 216 } 217 218 /** 219 * Every event that gets added to {@link EventQueue} will get processed here. Just calls the 220 * underlying process() method. 221 */ 222 private enum DisruptorEventHandler implements EventHandler<DisruptorEvent> { 223 INSTANCE; 224 225 @Override onEvent(DisruptorEvent event, long sequence, boolean endOfBatch)226 public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) { 227 Entry entry = event.getEntry(); 228 if (entry != null) { 229 entry.process(); 230 } 231 // Remove the reference to the previous entry to allow the memory to be gc'ed. 232 event.setEntry(null); 233 } 234 } 235 } 236