• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017, OpenCensus Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.opencensus.impl.internal;
18 
19 import com.lmax.disruptor.EventFactory;
20 import com.lmax.disruptor.EventHandler;
21 import com.lmax.disruptor.RingBuffer;
22 import com.lmax.disruptor.SleepingWaitStrategy;
23 import com.lmax.disruptor.dsl.Disruptor;
24 import com.lmax.disruptor.dsl.ProducerType;
25 import io.opencensus.implcore.internal.DaemonThreadFactory;
26 import io.opencensus.implcore.internal.EventQueue;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.logging.Level;
29 import java.util.logging.Logger;
30 import javax.annotation.Nullable;
31 import javax.annotation.concurrent.ThreadSafe;
32 
33 /**
34  * A low-latency event queue for background updating of (possibly contended) objects. This is
35  * intended for use by instrumentation methods to ensure that they do not block foreground
36  * activities. To customize the action taken on reading the queue, derive a new class from {@link
37  * EventQueue.Entry} and pass it to the {@link #enqueue(Entry)} method. The {@link Entry#process()}
38  * method of your class will be called and executed in a background thread. This class is a
39  * Singleton.
40  *
41  * <p>Example Usage: Given a class as follows:
42  *
43  * <pre>
44  * public class someClass {
45  *   public void doSomething() {
46  *     // Do the work of the method. One result is a measurement of something.
47  *     int measurement = doSomeWork();
48  *     // Make an update to the class state, based on this measurement. This work can take some
49  *     // time, but can be done asynchronously, in the background.
50  *     update(measurement);
51  *   }
52  *
53  *   public void update(int arg) {
54  *     // do something
55  *   }
56  * }
57  * </pre>
58  *
59  * <p>The work of calling {@code someClass.update()} can be executed in the backgound as follows:
60  *
61  * <pre>
62  * public class someClass {
63  *   // Add a EventQueueEntry class that will process the update call.
64  *   private static final class SomeClassUpdateEvent implements EventQueueEntry {
65  *     private final SomeClass someClassInstance;
66  *     private final int arg;
67  *
68  *     SomeObjectUpdateEvent(SomeObject someClassInstance, int arg) {
69  *       this.someClassInstance = someClassInstance;
70  *       this.arg = arg;
71  *     }
72  *
73  *     &#064;Override
74  *     public void process() {
75  *       someClassInstance.update(arg);
76  *     }
77  *   }
78  *
79  *   public void doSomething() {
80  *     int measurement = doSomeWork();
81  *     // Instead of calling update() directly, create an event to do the processing, and insert
82  *     // it into the EventQueue. It will be processed in a background thread, and doSomething()
83  *     // can return immediately.
84  *     EventQueue.getInstance.enqueue(new SomeClassUpdateEvent(this, measurement));
85  *   }
86  * }
87  * </pre>
88  */
89 @ThreadSafe
90 public final class DisruptorEventQueue implements EventQueue {
91 
92   private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName());
93 
94   // Number of events that can be enqueued at any one time. If more than this are enqueued,
95   // then subsequent attempts to enqueue new entries will block.
96   // TODO(aveitch): consider making this a parameter to the constructor, so the queue can be
97   // configured to a size appropriate to the system (smaller/less busy systems will not need as
98   // large a queue.
99   private static final int DISRUPTOR_BUFFER_SIZE = 8192;
100   // The single instance of the class.
101   private static final DisruptorEventQueue eventQueue = create();
102 
103   // The event queue is built on this {@link Disruptor}.
104   private final Disruptor<DisruptorEvent> disruptor;
105   // Ring Buffer for the {@link Disruptor} that underlies the queue.
106   private final RingBuffer<DisruptorEvent> ringBuffer;
107 
108   private volatile DisruptorEnqueuer enqueuer;
109 
110   // Creates a new EventQueue. Private to prevent creation of non-singleton instance.
DisruptorEventQueue( Disruptor<DisruptorEvent> disruptor, RingBuffer<DisruptorEvent> ringBuffer, DisruptorEnqueuer enqueuer)111   private DisruptorEventQueue(
112       Disruptor<DisruptorEvent> disruptor,
113       RingBuffer<DisruptorEvent> ringBuffer,
114       DisruptorEnqueuer enqueuer) {
115     this.disruptor = disruptor;
116     this.ringBuffer = ringBuffer;
117     this.enqueuer = enqueuer;
118   }
119 
120   // Creates a new EventQueue. Private to prevent creation of non-singleton instance.
create()121   private static DisruptorEventQueue create() {
122     // Create new Disruptor for processing. Note that Disruptor creates a single thread per
123     // consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details);
124     // this ensures that the event handler can take unsynchronized actions whenever possible.
125     Disruptor<DisruptorEvent> disruptor =
126         new Disruptor<>(
127             DisruptorEventFactory.INSTANCE,
128             DISRUPTOR_BUFFER_SIZE,
129             new DaemonThreadFactory("OpenCensus.Disruptor"),
130             ProducerType.MULTI,
131             new SleepingWaitStrategy());
132     disruptor.handleEventsWith(new DisruptorEventHandler[] {DisruptorEventHandler.INSTANCE});
133     disruptor.start();
134     final RingBuffer<DisruptorEvent> ringBuffer = disruptor.getRingBuffer();
135 
136     DisruptorEnqueuer enqueuer =
137         new DisruptorEnqueuer() {
138           @Override
139           public void enqueue(Entry entry) {
140             long sequence = ringBuffer.next();
141             try {
142               DisruptorEvent event = ringBuffer.get(sequence);
143               event.setEntry(entry);
144             } finally {
145               ringBuffer.publish(sequence);
146             }
147           }
148         };
149     return new DisruptorEventQueue(disruptor, ringBuffer, enqueuer);
150   }
151 
152   /**
153    * Returns the {@link DisruptorEventQueue} instance.
154    *
155    * @return the singleton {@code EventQueue} instance.
156    */
getInstance()157   public static DisruptorEventQueue getInstance() {
158     return eventQueue;
159   }
160 
161   /**
162    * Enqueues an event on the {@link DisruptorEventQueue}.
163    *
164    * @param entry a class encapsulating the actions to be taken for event processing.
165    */
166   @Override
enqueue(Entry entry)167   public void enqueue(Entry entry) {
168     enqueuer.enqueue(entry);
169   }
170 
171   /** Shuts down the underlying disruptor. */
172   @Override
shutdown()173   public void shutdown() {
174     enqueuer =
175         new DisruptorEnqueuer() {
176           final AtomicBoolean logged = new AtomicBoolean(false);
177 
178           @Override
179           public void enqueue(Entry entry) {
180             if (!logged.getAndSet(true)) {
181               logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown.");
182             }
183           }
184         };
185 
186     disruptor.shutdown();
187   }
188 
189   // Allows this event queue to safely shutdown by not enqueuing events on the ring buffer
190   private abstract static class DisruptorEnqueuer {
191 
enqueue(Entry entry)192     public abstract void enqueue(Entry entry);
193   }
194 
195   // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
196   private static final class DisruptorEvent {
197 
198     // TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so
199     // intuitively this variable must be volatile.
200     @Nullable private volatile Entry entry = null;
201 
202     // Sets the EventQueueEntry associated with this DisruptorEvent.
setEntry(@ullable Entry entry)203     void setEntry(@Nullable Entry entry) {
204       this.entry = entry;
205     }
206 
207     // Returns the EventQueueEntry associated with this DisruptorEvent.
208     @Nullable
getEntry()209     Entry getEntry() {
210       return entry;
211     }
212   }
213 
214   // Factory for DisruptorEvent.
215   private enum DisruptorEventFactory implements EventFactory<DisruptorEvent> {
216     INSTANCE;
217 
218     @Override
newInstance()219     public DisruptorEvent newInstance() {
220       return new DisruptorEvent();
221     }
222   }
223 
224   /**
225    * Every event that gets added to {@link EventQueue} will get processed here. Just calls the
226    * underlying process() method.
227    */
228   private enum DisruptorEventHandler implements EventHandler<DisruptorEvent> {
229     INSTANCE;
230 
231     @Override
onEvent(DisruptorEvent event, long sequence, boolean endOfBatch)232     public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) {
233       Entry entry = event.getEntry();
234       if (entry != null) {
235         entry.process();
236       }
237       // Remove the reference to the previous entry to allow the memory to be gc'ed.
238       event.setEntry(null);
239     }
240   }
241 }
242