1[/ 2 / Copyright (c) 2009 Helge Bahmann 3 / 4 / Distributed under the Boost Software License, Version 1.0. (See accompanying 5 / file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 /] 7 8[section:example_reference_counters Reference counting] 9 10The purpose of a ['reference counter] is to count the number 11of pointers to an object. The object can be destroyed as 12soon as the reference counter reaches zero. 13 14[section Implementation] 15 16[c++] 17 18 #include <boost/intrusive_ptr.hpp> 19 #include <boost/atomic.hpp> 20 21 class X { 22 public: 23 typedef boost::intrusive_ptr<X> pointer; 24 X() : refcount_(0) {} 25 26 private: 27 mutable boost::atomic<int> refcount_; 28 friend void intrusive_ptr_add_ref(const X * x) 29 { 30 x->refcount_.fetch_add(1, boost::memory_order_relaxed); 31 } 32 friend void intrusive_ptr_release(const X * x) 33 { 34 if (x->refcount_.fetch_sub(1, boost::memory_order_release) == 1) { 35 boost::atomic_thread_fence(boost::memory_order_acquire); 36 delete x; 37 } 38 } 39 }; 40 41[endsect] 42 43[section Usage] 44 45[c++] 46 47 X::pointer x = new X; 48 49[endsect] 50 51[section Discussion] 52 53Increasing the reference counter can always be done with 54[^memory_order_relaxed]: New references to an object can only 55be formed from an existing reference, and passing an existing 56reference from one thread to another must already provide any 57required synchronization. 58 59It is important to enforce any possible access to the object in 60one thread (through an existing reference) to ['happen before] 61deleting the object in a different thread. This is achieved 62by a "release" operation after dropping a reference (any 63access to the object through this reference must obviously 64happened before), and an "acquire" operation before 65deleting the object. 66 67It would be possible to use [^memory_order_acq_rel] for the 68[^fetch_sub] operation, but this results in unneeded "acquire" 69operations when the reference counter does not yet reach zero 70and may impose a performance penalty. 71 72[endsect] 73 74[endsect] 75 76[section:example_spinlock Spinlock] 77 78The purpose of a ['spin lock] is to prevent multiple threads 79from concurrently accessing a shared data structure. In contrast 80to a mutex, threads will busy-wait and waste CPU cycles instead 81of yielding the CPU to another thread. ['Do not use spinlocks 82unless you are certain that you understand the consequences.] 83 84[section Implementation] 85 86[c++] 87 88 #include <boost/atomic.hpp> 89 90 class spinlock { 91 private: 92 typedef enum {Locked, Unlocked} LockState; 93 boost::atomic<LockState> state_; 94 95 public: 96 spinlock() : state_(Unlocked) {} 97 98 void lock() 99 { 100 while (state_.exchange(Locked, boost::memory_order_acquire) == Locked) { 101 /* busy-wait */ 102 } 103 } 104 void unlock() 105 { 106 state_.store(Unlocked, boost::memory_order_release); 107 } 108 }; 109 110[endsect] 111 112[section Usage] 113 114[c++] 115 116 spinlock s; 117 118 s.lock(); 119 // access data structure here 120 s.unlock(); 121 122[endsect] 123 124[section Discussion] 125 126The purpose of the spinlock is to make sure that one access 127to the shared data structure always strictly "happens before" 128another. The usage of acquire/release in lock/unlock is required 129and sufficient to guarantee this ordering. 130 131It would be correct to write the "lock" operation in the following 132way: 133 134[c++] 135 136 lock() 137 { 138 while (state_.exchange(Locked, boost::memory_order_relaxed) == Locked) { 139 /* busy-wait */ 140 } 141 atomic_thread_fence(boost::memory_order_acquire); 142 } 143 144This "optimization" is however a) useless and b) may in fact hurt: 145a) Since the thread will be busily spinning on a blocked spinlock, 146it does not matter if it will waste the CPU cycles with just 147"exchange" operations or with both useless "exchange" and "acquire" 148operations. b) A tight "exchange" loop without any 149memory-synchronizing instruction introduced through an "acquire" 150operation will on some systems monopolize the memory subsystem 151and degrade the performance of other system components. 152 153[endsect] 154 155[endsect] 156 157[section:singleton Singleton with double-checked locking pattern] 158 159The purpose of the ['Singleton with double-checked locking pattern] is to ensure 160that at most one instance of a particular object is created. 161If one instance has been created already, access to the existing 162object should be as light-weight as possible. 163 164[section Implementation] 165 166[c++] 167 168 #include <boost/atomic.hpp> 169 #include <boost/thread/mutex.hpp> 170 171 class X { 172 public: 173 static X * instance() 174 { 175 X * tmp = instance_.load(boost::memory_order_consume); 176 if (!tmp) { 177 boost::mutex::scoped_lock guard(instantiation_mutex); 178 tmp = instance_.load(boost::memory_order_consume); 179 if (!tmp) { 180 tmp = new X; 181 instance_.store(tmp, boost::memory_order_release); 182 } 183 } 184 return tmp; 185 } 186 private: 187 static boost::atomic<X *> instance_; 188 static boost::mutex instantiation_mutex; 189 }; 190 191 boost::atomic<X *> X::instance_(0); 192 193[endsect] 194 195[section Usage] 196 197[c++] 198 199 X * x = X::instance(); 200 // dereference x 201 202[endsect] 203 204[section Discussion] 205 206The mutex makes sure that only one instance of the object is 207ever created. The [^instance] method must make sure that any 208dereference of the object strictly "happens after" creating 209the instance in another thread. The use of [^memory_order_release] 210after creating and initializing the object and [^memory_order_consume] 211before dereferencing the object provides this guarantee. 212 213It would be permissible to use [^memory_order_acquire] instead of 214[^memory_order_consume], but this provides a stronger guarantee 215than is required since only operations depending on the value of 216the pointer need to be ordered. 217 218[endsect] 219 220[endsect] 221 222[section:example_ringbuffer Wait-free ring buffer] 223 224A ['wait-free ring buffer] provides a mechanism for relaying objects 225from one single "producer" thread to one single "consumer" thread without 226any locks. The operations on this data structure are "wait-free" which 227means that each operation finishes within a constant number of steps. 228This makes this data structure suitable for use in hard real-time systems 229or for communication with interrupt/signal handlers. 230 231[section Implementation] 232 233[c++] 234 235 #include <boost/atomic.hpp> 236 237 template<typename T, size_t Size> 238 class ringbuffer { 239 public: 240 ringbuffer() : head_(0), tail_(0) {} 241 242 bool push(const T & value) 243 { 244 size_t head = head_.load(boost::memory_order_relaxed); 245 size_t next_head = next(head); 246 if (next_head == tail_.load(boost::memory_order_acquire)) 247 return false; 248 ring_[head] = value; 249 head_.store(next_head, boost::memory_order_release); 250 return true; 251 } 252 bool pop(T & value) 253 { 254 size_t tail = tail_.load(boost::memory_order_relaxed); 255 if (tail == head_.load(boost::memory_order_acquire)) 256 return false; 257 value = ring_[tail]; 258 tail_.store(next(tail), boost::memory_order_release); 259 return true; 260 } 261 private: 262 size_t next(size_t current) 263 { 264 return (current + 1) % Size; 265 } 266 T ring_[Size]; 267 boost::atomic<size_t> head_, tail_; 268 }; 269 270[endsect] 271 272[section Usage] 273 274[c++] 275 276 ringbuffer<int, 32> r; 277 278 // try to insert an element 279 if (r.push(42)) { /* succeeded */ } 280 else { /* buffer full */ } 281 282 // try to retrieve an element 283 int value; 284 if (r.pop(value)) { /* succeeded */ } 285 else { /* buffer empty */ } 286 287[endsect] 288 289[section Discussion] 290 291The implementation makes sure that the ring indices do 292not "lap-around" each other to ensure that no elements 293are either lost or read twice. 294 295Furthermore it must guarantee that read-access to a 296particular object in [^pop] "happens after" it has been 297written in [^push]. This is achieved by writing [^head_ ] 298with "release" and reading it with "acquire". Conversely 299the implementation also ensures that read access to 300a particular ring element "happens before" before 301rewriting this element with a new value by accessing [^tail_] 302with appropriate ordering constraints. 303 304[endsect] 305 306[endsect] 307 308[section:mp_queue Lock-free multi-producer queue] 309 310The purpose of the ['lock-free multi-producer queue] is to allow 311an arbitrary number of producers to enqueue objects which are 312retrieved and processed in FIFO order by a single consumer. 313 314[section Implementation] 315 316[c++] 317 318 template<typename T> 319 class lockfree_queue { 320 public: 321 struct node { 322 T data; 323 node * next; 324 }; 325 void push(const T &data) 326 { 327 node * n = new node; 328 n->data = data; 329 node * stale_head = head_.load(boost::memory_order_relaxed); 330 do { 331 n->next = stale_head; 332 } while (!head_.compare_exchange_weak(stale_head, n, boost::memory_order_release)); 333 } 334 335 node * pop_all(void) 336 { 337 T * last = pop_all_reverse(), * first = 0; 338 while(last) { 339 T * tmp = last; 340 last = last->next; 341 tmp->next = first; 342 first = tmp; 343 } 344 return first; 345 } 346 347 lockfree_queue() : head_(0) {} 348 349 // alternative interface if ordering is of no importance 350 node * pop_all_reverse(void) 351 { 352 return head_.exchange(0, boost::memory_order_consume); 353 } 354 private: 355 boost::atomic<node *> head_; 356 }; 357 358[endsect] 359 360[section Usage] 361 362[c++] 363 364 lockfree_queue<int> q; 365 366 // insert elements 367 q.push(42); 368 q.push(2); 369 370 // pop elements 371 lockfree_queue<int>::node * x = q.pop_all() 372 while(x) { 373 X * tmp = x; 374 x = x->next; 375 // process tmp->data, probably delete it afterwards 376 delete tmp; 377 } 378 379[endsect] 380 381[section Discussion] 382 383The implementation guarantees that all objects enqueued are 384processed in the order they were enqueued by building a singly-linked 385list of object in reverse processing order. The queue is atomically 386emptied by the consumer (in an operation that is not only lock-free but 387wait-free) and brought into correct order. 388 389It must be guaranteed that any access to an object to be enqueued 390by the producer "happens before" any access by the consumer. This 391is assured by inserting objects into the list with ['release] and 392dequeuing them with ['consume] memory order. It is not 393necessary to use ['acquire] memory order in [^waitfree_queue::pop_all] 394because all operations involved depend on the value of 395the atomic pointer through dereference. 396 397[endsect] 398 399[endsect] 400