• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017, OpenCensus Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.opencensus.impl.internal;
18 
19 import com.lmax.disruptor.EventFactory;
20 import com.lmax.disruptor.EventHandler;
21 import com.lmax.disruptor.RingBuffer;
22 import com.lmax.disruptor.SleepingWaitStrategy;
23 import com.lmax.disruptor.dsl.Disruptor;
24 import com.lmax.disruptor.dsl.ProducerType;
25 import io.opencensus.implcore.internal.DaemonThreadFactory;
26 import io.opencensus.implcore.internal.EventQueue;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.logging.Level;
29 import java.util.logging.Logger;
30 import javax.annotation.Nullable;
31 import javax.annotation.concurrent.ThreadSafe;
32 
33 /**
34  * A low-latency event queue for background updating of (possibly contended) objects. This is
35  * intended for use by instrumentation methods to ensure that they do not block foreground
36  * activities. To customize the action taken on reading the queue, derive a new class from {@link
37  * EventQueue.Entry} and pass it to the {@link #enqueue(Entry)} method. The {@link Entry#process()}
38  * method of your class will be called and executed in a background thread. This class is a
39  * Singleton.
40  *
41  * <p>Example Usage: Given a class as follows:
42  *
43  * <pre>
44  * public class someClass {
45  *   public void doSomething() {
46  *     // Do the work of the method. One result is a measurement of something.
47  *     int measurement = doSomeWork();
48  *     // Make an update to the class state, based on this measurement. This work can take some
49  *     // time, but can be done asynchronously, in the background.
50  *     update(measurement);
51  *   }
52  *
53  *   public void update(int arg) {
54  *     // do something
55  *   }
56  * }
57  * </pre>
58  *
59  * <p>The work of calling {@code someClass.update()} can be executed in the backgound as follows:
60  *
61  * <pre>
62  * public class someClass {
63  *   // Add a EventQueueEntry class that will process the update call.
64  *   private static final class SomeClassUpdateEvent implements EventQueueEntry {
65  *     private final SomeClass someClassInstance;
66  *     private final int arg;
67  *
68  *     SomeObjectUpdateEvent(SomeObject someClassInstance, int arg) {
69  *       this.someClassInstance = someClassInstance;
70  *       this.arg = arg;
71  *     }
72  *
73  *     &#064;Override
74  *     public void process() {
75  *       someClassInstance.update(arg);
76  *     }
77  *   }
78  *
79  *   public void doSomething() {
80  *     int measurement = doSomeWork();
81  *     // Instead of calling update() directly, create an event to do the processing, and insert
82  *     // it into the EventQueue. It will be processed in a background thread, and doSomething()
83  *     // can return immediately.
84  *     EventQueue.getInstance.enqueue(new SomeClassUpdateEvent(this, measurement));
85  *   }
86  * }
87  * </pre>
88  */
89 @ThreadSafe
90 public final class DisruptorEventQueue implements EventQueue {
91 
92   private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName());
93 
94   // Number of events that can be enqueued at any one time. If more than this are enqueued,
95   // then subsequent attempts to enqueue new entries will block.
96   // TODO(aveitch): consider making this a parameter to the constructor, so the queue can be
97   // configured to a size appropriate to the system (smaller/less busy systems will not need as
98   // large a queue.
99   private static final int DISRUPTOR_BUFFER_SIZE = 8192;
100   // The single instance of the class.
101   private static final DisruptorEventQueue eventQueue = create();
102 
103   // The event queue is built on this {@link Disruptor}.
104   private final Disruptor<DisruptorEvent> disruptor;
105 
106   private volatile DisruptorEnqueuer enqueuer;
107 
108   // Creates a new EventQueue. Private to prevent creation of non-singleton instance.
DisruptorEventQueue(Disruptor<DisruptorEvent> disruptor, DisruptorEnqueuer enqueuer)109   private DisruptorEventQueue(Disruptor<DisruptorEvent> disruptor, DisruptorEnqueuer enqueuer) {
110     this.disruptor = disruptor;
111     this.enqueuer = enqueuer;
112   }
113 
114   // Creates a new EventQueue. Private to prevent creation of non-singleton instance.
create()115   private static DisruptorEventQueue create() {
116     // Create new Disruptor for processing. Note that Disruptor creates a single thread per
117     // consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details);
118     // this ensures that the event handler can take unsynchronized actions whenever possible.
119     Disruptor<DisruptorEvent> disruptor =
120         new Disruptor<>(
121             DisruptorEventFactory.INSTANCE,
122             DISRUPTOR_BUFFER_SIZE,
123             new DaemonThreadFactory("OpenCensus.Disruptor"),
124             ProducerType.MULTI,
125             new SleepingWaitStrategy(0, 1000 * 1000));
126     disruptor.handleEventsWith(new DisruptorEventHandler[] {DisruptorEventHandler.INSTANCE});
127     disruptor.start();
128     final RingBuffer<DisruptorEvent> ringBuffer = disruptor.getRingBuffer();
129 
130     DisruptorEnqueuer enqueuer =
131         new DisruptorEnqueuer() {
132           @Override
133           public void enqueue(Entry entry) {
134             long sequence = ringBuffer.next();
135             try {
136               DisruptorEvent event = ringBuffer.get(sequence);
137               event.setEntry(entry);
138             } finally {
139               ringBuffer.publish(sequence);
140             }
141           }
142         };
143     return new DisruptorEventQueue(disruptor, enqueuer);
144   }
145 
146   /**
147    * Returns the {@link DisruptorEventQueue} instance.
148    *
149    * @return the singleton {@code EventQueue} instance.
150    */
getInstance()151   public static DisruptorEventQueue getInstance() {
152     return eventQueue;
153   }
154 
155   /**
156    * Enqueues an event on the {@link DisruptorEventQueue}.
157    *
158    * @param entry a class encapsulating the actions to be taken for event processing.
159    */
160   @Override
enqueue(Entry entry)161   public void enqueue(Entry entry) {
162     enqueuer.enqueue(entry);
163   }
164 
165   /** Shuts down the underlying disruptor. */
166   @Override
shutdown()167   public void shutdown() {
168     enqueuer =
169         new DisruptorEnqueuer() {
170           final AtomicBoolean logged = new AtomicBoolean(false);
171 
172           @Override
173           public void enqueue(Entry entry) {
174             if (!logged.getAndSet(true)) {
175               logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown.");
176             }
177           }
178         };
179 
180     disruptor.shutdown();
181   }
182 
183   // Allows this event queue to safely shutdown by not enqueuing events on the ring buffer
184   private abstract static class DisruptorEnqueuer {
185 
enqueue(Entry entry)186     public abstract void enqueue(Entry entry);
187   }
188 
189   // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
190   private static final class DisruptorEvent {
191 
192     // TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so
193     // intuitively this variable must be volatile.
194     @Nullable private volatile Entry entry = null;
195 
196     // Sets the EventQueueEntry associated with this DisruptorEvent.
setEntry(@ullable Entry entry)197     void setEntry(@Nullable Entry entry) {
198       this.entry = entry;
199     }
200 
201     // Returns the EventQueueEntry associated with this DisruptorEvent.
202     @Nullable
getEntry()203     Entry getEntry() {
204       return entry;
205     }
206   }
207 
208   // Factory for DisruptorEvent.
209   private enum DisruptorEventFactory implements EventFactory<DisruptorEvent> {
210     INSTANCE;
211 
212     @Override
newInstance()213     public DisruptorEvent newInstance() {
214       return new DisruptorEvent();
215     }
216   }
217 
218   /**
219    * Every event that gets added to {@link EventQueue} will get processed here. Just calls the
220    * underlying process() method.
221    */
222   private enum DisruptorEventHandler implements EventHandler<DisruptorEvent> {
223     INSTANCE;
224 
225     @Override
onEvent(DisruptorEvent event, long sequence, boolean endOfBatch)226     public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) {
227       Entry entry = event.getEntry();
228       if (entry != null) {
229         entry.process();
230       }
231       // Remove the reference to the previous entry to allow the memory to be gc'ed.
232       event.setEntry(null);
233     }
234   }
235 }
236