• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1[/
2 / Copyright (c) 2009 Helge Bahmann
3 /
4 / Distributed under the Boost Software License, Version 1.0. (See accompanying
5 / file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 /]
7
8[section:example_reference_counters Reference counting]
9
10The purpose of a ['reference counter] is to count the number
11of pointers to an object. The object can be destroyed as
12soon as the reference counter reaches zero.
13
14[section Implementation]
15
16[c++]
17
18  #include <boost/intrusive_ptr.hpp>
19  #include <boost/atomic.hpp>
20
21  class X {
22  public:
23    typedef boost::intrusive_ptr<X> pointer;
24    X() : refcount_(0) {}
25
26  private:
27    mutable boost::atomic<int> refcount_;
28    friend void intrusive_ptr_add_ref(const X * x)
29    {
30      x->refcount_.fetch_add(1, boost::memory_order_relaxed);
31    }
32    friend void intrusive_ptr_release(const X * x)
33    {
34      if (x->refcount_.fetch_sub(1, boost::memory_order_release) == 1) {
35        boost::atomic_thread_fence(boost::memory_order_acquire);
36        delete x;
37      }
38    }
39  };
40
41[endsect]
42
43[section Usage]
44
45[c++]
46
47  X::pointer x = new X;
48
49[endsect]
50
51[section Discussion]
52
53Increasing the reference counter can always be done with
54[^memory_order_relaxed]: New references to an object can only
55be formed from an existing reference, and passing an existing
56reference from one thread to another must already provide any
57required synchronization.
58
59It is important to enforce any possible access to the object in
60one thread (through an existing reference) to ['happen before]
61deleting the object in a different thread. This is achieved
62by a "release" operation after dropping a reference (any
63access to the object through this reference must obviously
64happened before), and an "acquire" operation before
65deleting the object.
66
67It would be possible to use [^memory_order_acq_rel] for the
68[^fetch_sub] operation, but this results in unneeded "acquire"
69operations when the reference counter does not yet reach zero
70and may impose a performance penalty.
71
72[endsect]
73
74[endsect]
75
76[section:example_spinlock Spinlock]
77
78The purpose of a ['spin lock] is to prevent multiple threads
79from concurrently accessing a shared data structure. In contrast
80to a mutex, threads will busy-wait and waste CPU cycles instead
81of yielding the CPU to another thread. ['Do not use spinlocks
82unless you are certain that you understand the consequences.]
83
84[section Implementation]
85
86[c++]
87
88  #include <boost/atomic.hpp>
89
90  class spinlock {
91  private:
92    typedef enum {Locked, Unlocked} LockState;
93    boost::atomic<LockState> state_;
94
95  public:
96    spinlock() : state_(Unlocked) {}
97
98    void lock()
99    {
100      while (state_.exchange(Locked, boost::memory_order_acquire) == Locked) {
101        /* busy-wait */
102      }
103    }
104    void unlock()
105    {
106      state_.store(Unlocked, boost::memory_order_release);
107    }
108  };
109
110[endsect]
111
112[section Usage]
113
114[c++]
115
116  spinlock s;
117
118  s.lock();
119  // access data structure here
120  s.unlock();
121
122[endsect]
123
124[section Discussion]
125
126The purpose of the spinlock is to make sure that one access
127to the shared data structure always strictly "happens before"
128another. The usage of acquire/release in lock/unlock is required
129and sufficient to guarantee this ordering.
130
131It would be correct to write the "lock" operation in the following
132way:
133
134[c++]
135
136  lock()
137  {
138    while (state_.exchange(Locked, boost::memory_order_relaxed) == Locked) {
139      /* busy-wait */
140    }
141    atomic_thread_fence(boost::memory_order_acquire);
142  }
143
144This "optimization" is however a) useless and b) may in fact hurt:
145a) Since the thread will be busily spinning on a blocked spinlock,
146it does not matter if it will waste the CPU cycles with just
147"exchange" operations or with both useless "exchange" and "acquire"
148operations. b) A tight "exchange" loop without any
149memory-synchronizing instruction introduced through an "acquire"
150operation will on some systems monopolize the memory subsystem
151and degrade the performance of other system components.
152
153[endsect]
154
155[endsect]
156
157[section:singleton Singleton with double-checked locking pattern]
158
159The purpose of the ['Singleton with double-checked locking pattern] is to ensure
160that at most one instance of a particular object is created.
161If one instance has been created already, access to the existing
162object should be as light-weight as possible.
163
164[section Implementation]
165
166[c++]
167
168  #include <boost/atomic.hpp>
169  #include <boost/thread/mutex.hpp>
170
171  class X {
172  public:
173    static X * instance()
174    {
175      X * tmp = instance_.load(boost::memory_order_consume);
176      if (!tmp) {
177        boost::mutex::scoped_lock guard(instantiation_mutex);
178        tmp = instance_.load(boost::memory_order_consume);
179        if (!tmp) {
180          tmp = new X;
181          instance_.store(tmp, boost::memory_order_release);
182        }
183      }
184      return tmp;
185    }
186  private:
187    static boost::atomic<X *> instance_;
188    static boost::mutex instantiation_mutex;
189  };
190
191  boost::atomic<X *> X::instance_(0);
192
193[endsect]
194
195[section Usage]
196
197[c++]
198
199  X * x = X::instance();
200  // dereference x
201
202[endsect]
203
204[section Discussion]
205
206The mutex makes sure that only one instance of the object is
207ever created. The [^instance] method must make sure that any
208dereference of the object strictly "happens after" creating
209the instance in another thread. The use of [^memory_order_release]
210after creating and initializing the object and [^memory_order_consume]
211before dereferencing the object provides this guarantee.
212
213It would be permissible to use [^memory_order_acquire] instead of
214[^memory_order_consume], but this provides a stronger guarantee
215than is required since only operations depending on the value of
216the pointer need to be ordered.
217
218[endsect]
219
220[endsect]
221
222[section:example_ringbuffer Wait-free ring buffer]
223
224A ['wait-free ring buffer] provides a mechanism for relaying objects
225from one single "producer" thread to one single "consumer" thread without
226any locks. The operations on this data structure are "wait-free" which
227means that each operation finishes within a constant number of steps.
228This makes this data structure suitable for use in hard real-time systems
229or for communication with interrupt/signal handlers.
230
231[section Implementation]
232
233[c++]
234
235  #include <boost/atomic.hpp>
236
237  template<typename T, size_t Size>
238  class ringbuffer {
239  public:
240    ringbuffer() : head_(0), tail_(0) {}
241
242    bool push(const T & value)
243    {
244      size_t head = head_.load(boost::memory_order_relaxed);
245      size_t next_head = next(head);
246      if (next_head == tail_.load(boost::memory_order_acquire))
247        return false;
248      ring_[head] = value;
249      head_.store(next_head, boost::memory_order_release);
250      return true;
251    }
252    bool pop(T & value)
253    {
254      size_t tail = tail_.load(boost::memory_order_relaxed);
255      if (tail == head_.load(boost::memory_order_acquire))
256        return false;
257      value = ring_[tail];
258      tail_.store(next(tail), boost::memory_order_release);
259      return true;
260    }
261  private:
262    size_t next(size_t current)
263    {
264      return (current + 1) % Size;
265    }
266    T ring_[Size];
267    boost::atomic<size_t> head_, tail_;
268  };
269
270[endsect]
271
272[section Usage]
273
274[c++]
275
276  ringbuffer<int, 32> r;
277
278  // try to insert an element
279  if (r.push(42)) { /* succeeded */ }
280  else { /* buffer full */ }
281
282  // try to retrieve an element
283  int value;
284  if (r.pop(value)) { /* succeeded */ }
285  else { /* buffer empty */ }
286
287[endsect]
288
289[section Discussion]
290
291The implementation makes sure that the ring indices do
292not "lap-around" each other to ensure that no elements
293are either lost or read twice.
294
295Furthermore it must guarantee that read-access to a
296particular object in [^pop] "happens after" it has been
297written in [^push]. This is achieved by writing [^head_ ]
298with "release" and reading it with "acquire". Conversely
299the implementation also ensures that read access to
300a particular ring element "happens before" before
301rewriting this element with a new value by accessing [^tail_]
302with appropriate ordering constraints.
303
304[endsect]
305
306[endsect]
307
308[section:mp_queue Lock-free multi-producer queue]
309
310The purpose of the ['lock-free multi-producer queue] is to allow
311an arbitrary number of producers to enqueue objects which are
312retrieved and processed in FIFO order by a single consumer.
313
314[section Implementation]
315
316[c++]
317
318  template<typename T>
319  class lockfree_queue {
320  public:
321    struct node {
322      T data;
323      node * next;
324    };
325    void push(const T &data)
326    {
327      node * n = new node;
328      n->data = data;
329      node * stale_head = head_.load(boost::memory_order_relaxed);
330      do {
331        n->next = stale_head;
332      } while (!head_.compare_exchange_weak(stale_head, n, boost::memory_order_release));
333    }
334
335    node * pop_all(void)
336    {
337      T * last = pop_all_reverse(), * first = 0;
338      while(last) {
339        T * tmp = last;
340        last = last->next;
341        tmp->next = first;
342        first = tmp;
343      }
344      return first;
345    }
346
347    lockfree_queue() : head_(0) {}
348
349    // alternative interface if ordering is of no importance
350    node * pop_all_reverse(void)
351    {
352      return head_.exchange(0, boost::memory_order_consume);
353    }
354  private:
355    boost::atomic<node *> head_;
356  };
357
358[endsect]
359
360[section Usage]
361
362[c++]
363
364  lockfree_queue<int> q;
365
366  // insert elements
367  q.push(42);
368  q.push(2);
369
370  // pop elements
371  lockfree_queue<int>::node * x = q.pop_all()
372  while(x) {
373    X * tmp = x;
374    x = x->next;
375    // process tmp->data, probably delete it afterwards
376    delete tmp;
377  }
378
379[endsect]
380
381[section Discussion]
382
383The implementation guarantees that all objects enqueued are
384processed in the order they were enqueued by building a singly-linked
385list of object in reverse processing order. The queue is atomically
386emptied by the consumer (in an operation that is not only lock-free but
387wait-free) and brought into correct order.
388
389It must be guaranteed that any access to an object to be enqueued
390by the producer "happens before" any access by the consumer. This
391is assured by inserting objects into the list with ['release] and
392dequeuing them with ['consume] memory order. It is not
393necessary to use ['acquire] memory order in [^waitfree_queue::pop_all]
394because all operations involved depend on the value of
395the atomic pointer through dereference.
396
397[endsect]
398
399[endsect]
400