1 // lock-free queue from 2 // Michael, M. M. and Scott, M. L., 3 // "simple, fast and practical non-blocking and blocking concurrent queue algorithms" 4 // 5 // Copyright (C) 2008-2013 Tim Blechmann 6 // 7 // Distributed under the Boost Software License, Version 1.0. (See 8 // accompanying file LICENSE_1_0.txt or copy at 9 // http://www.boost.org/LICENSE_1_0.txt) 10 11 #ifndef BOOST_LOCKFREE_FIFO_HPP_INCLUDED 12 #define BOOST_LOCKFREE_FIFO_HPP_INCLUDED 13 14 #include <boost/assert.hpp> 15 #include <boost/static_assert.hpp> 16 #include <boost/core/allocator_access.hpp> 17 #include <boost/type_traits/has_trivial_assign.hpp> 18 #include <boost/type_traits/has_trivial_destructor.hpp> 19 #include <boost/config.hpp> // for BOOST_LIKELY & BOOST_ALIGNMENT 20 21 #include <boost/lockfree/detail/atomic.hpp> 22 #include <boost/lockfree/detail/copy_payload.hpp> 23 #include <boost/lockfree/detail/freelist.hpp> 24 #include <boost/lockfree/detail/parameter.hpp> 25 #include <boost/lockfree/detail/tagged_ptr.hpp> 26 27 #include <boost/lockfree/lockfree_forward.hpp> 28 29 #ifdef BOOST_HAS_PRAGMA_ONCE 30 #pragma once 31 #endif 32 33 34 #if defined(_MSC_VER) 35 #pragma warning(push) 36 #pragma warning(disable: 4324) // structure was padded due to __declspec(align()) 37 #endif 38 39 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000) 40 #pragma warning(push) 41 #pragma warning(disable:488) // template parameter unused in declaring parameter types, 42 // gets erronously triggered the queue constructor which 43 // takes an allocator of another type and rebinds it 44 #endif 45 46 47 48 namespace boost { 49 namespace lockfree { 50 namespace detail { 51 52 typedef parameter::parameters<boost::parameter::optional<tag::allocator>, 53 boost::parameter::optional<tag::capacity> 54 > queue_signature; 55 56 } /* namespace detail */ 57 58 59 /** The queue class provides a multi-writer/multi-reader queue, pushing and popping is lock-free, 60 * construction/destruction has to be synchronized. It uses a freelist for memory management, 61 * freed nodes are pushed to the freelist and not returned to the OS before the queue is destroyed. 62 * 63 * \b Policies: 64 * - \ref boost::lockfree::fixed_sized, defaults to \c boost::lockfree::fixed_sized<false> \n 65 * Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. \n 66 * If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed 67 * by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index 68 * type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way 69 * to achieve lock-freedom. 70 * 71 * - \ref boost::lockfree::capacity, optional \n 72 * If this template argument is passed to the options, the size of the queue is set at compile-time.\n 73 * This option implies \c fixed_sized<true> 74 * 75 * - \ref boost::lockfree::allocator, defaults to \c boost::lockfree::allocator<std::allocator<void>> \n 76 * Specifies the allocator that is used for the internal freelist 77 * 78 * \b Requirements: 79 * - T must have a copy constructor 80 * - T must have a trivial assignment operator 81 * - T must have a trivial destructor 82 * 83 * */ 84 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 85 template <typename T, class A0, class A1, class A2> 86 #else 87 template <typename T, typename ...Options> 88 #endif 89 class queue 90 { 91 private: 92 #ifndef BOOST_DOXYGEN_INVOKED 93 94 #ifdef BOOST_HAS_TRIVIAL_DESTRUCTOR 95 BOOST_STATIC_ASSERT((boost::has_trivial_destructor<T>::value)); 96 #endif 97 98 #ifdef BOOST_HAS_TRIVIAL_ASSIGN 99 BOOST_STATIC_ASSERT((boost::has_trivial_assign<T>::value)); 100 #endif 101 102 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 103 typedef typename detail::queue_signature::bind<A0, A1, A2>::type bound_args; 104 #else 105 typedef typename detail::queue_signature::bind<Options...>::type bound_args; 106 #endif 107 108 static const bool has_capacity = detail::extract_capacity<bound_args>::has_capacity; 109 static const size_t capacity = detail::extract_capacity<bound_args>::capacity + 1; // the queue uses one dummy node 110 static const bool fixed_sized = detail::extract_fixed_sized<bound_args>::value; 111 static const bool node_based = !(has_capacity || fixed_sized); 112 static const bool compile_time_sized = has_capacity; 113 BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES)114 struct BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES) node 115 { 116 typedef typename detail::select_tagged_handle<node, node_based>::tagged_handle_type tagged_node_handle; 117 typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type; 118 119 node(T const & v, handle_type null_handle): 120 data(v) 121 { 122 /* increment tag to avoid ABA problem */ 123 tagged_node_handle old_next = next.load(memory_order_relaxed); 124 tagged_node_handle new_next (null_handle, old_next.get_next_tag()); 125 next.store(new_next, memory_order_release); 126 } 127 128 node (handle_type null_handle): 129 next(tagged_node_handle(null_handle, 0)) 130 {} 131 132 node(void) 133 {} 134 135 atomic<tagged_node_handle> next; 136 T data; 137 }; 138 139 typedef typename detail::extract_allocator<bound_args, node>::type node_allocator; 140 typedef typename detail::select_freelist<node, node_allocator, compile_time_sized, fixed_sized, capacity>::type pool_t; 141 typedef typename pool_t::tagged_node_handle tagged_node_handle; 142 typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type; 143 initialize(void)144 void initialize(void) 145 { 146 node * n = pool.template construct<true, false>(pool.null_handle()); 147 tagged_node_handle dummy_node(pool.get_handle(n), 0); 148 head_.store(dummy_node, memory_order_relaxed); 149 tail_.store(dummy_node, memory_order_release); 150 } 151 152 struct implementation_defined 153 { 154 typedef node_allocator allocator; 155 typedef std::size_t size_type; 156 }; 157 158 #endif 159 160 BOOST_DELETED_FUNCTION(queue(queue const&)) 161 BOOST_DELETED_FUNCTION(queue& operator= (queue const&)) 162 163 public: 164 typedef T value_type; 165 typedef typename implementation_defined::allocator allocator; 166 typedef typename implementation_defined::size_type size_type; 167 168 /** 169 * \return true, if implementation is lock-free. 170 * 171 * \warning It only checks, if the queue head and tail nodes and the freelist can be modified in a lock-free manner. 172 * On most platforms, the whole implementation is lock-free, if this is true. Using c++0x-style atomics, there is 173 * no possibility to provide a completely accurate implementation, because one would need to test every internal 174 * node, which is impossible if further nodes will be allocated from the operating system. 175 * */ is_lock_free(void) const176 bool is_lock_free (void) const 177 { 178 return head_.is_lock_free() && tail_.is_lock_free() && pool.is_lock_free(); 179 } 180 181 /** Construct a fixed-sized queue 182 * 183 * \pre Must specify a capacity<> argument 184 * */ queue(void)185 queue(void): 186 head_(tagged_node_handle(0, 0)), 187 tail_(tagged_node_handle(0, 0)), 188 pool(node_allocator(), capacity) 189 { 190 // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling 191 // this function and this function may be compiled even when it isn't being used. 192 BOOST_ASSERT(has_capacity); 193 initialize(); 194 } 195 196 /** Construct a fixed-sized queue with a custom allocator 197 * 198 * \pre Must specify a capacity<> argument 199 * */ 200 template <typename U> queue(typename boost::allocator_rebind<node_allocator,U>::type const & alloc)201 explicit queue(typename boost::allocator_rebind<node_allocator, U>::type const & alloc): 202 head_(tagged_node_handle(0, 0)), 203 tail_(tagged_node_handle(0, 0)), 204 pool(alloc, capacity) 205 { 206 BOOST_STATIC_ASSERT(has_capacity); 207 initialize(); 208 } 209 210 /** Construct a fixed-sized queue with a custom allocator 211 * 212 * \pre Must specify a capacity<> argument 213 * */ queue(allocator const & alloc)214 explicit queue(allocator const & alloc): 215 head_(tagged_node_handle(0, 0)), 216 tail_(tagged_node_handle(0, 0)), 217 pool(alloc, capacity) 218 { 219 // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling 220 // this function and this function may be compiled even when it isn't being used. 221 BOOST_ASSERT(has_capacity); 222 initialize(); 223 } 224 225 /** Construct a variable-sized queue 226 * 227 * Allocate n nodes initially for the freelist 228 * 229 * \pre Must \b not specify a capacity<> argument 230 * */ queue(size_type n)231 explicit queue(size_type n): 232 head_(tagged_node_handle(0, 0)), 233 tail_(tagged_node_handle(0, 0)), 234 pool(node_allocator(), n + 1) 235 { 236 // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling 237 // this function and this function may be compiled even when it isn't being used. 238 BOOST_ASSERT(!has_capacity); 239 initialize(); 240 } 241 242 /** Construct a variable-sized queue with a custom allocator 243 * 244 * Allocate n nodes initially for the freelist 245 * 246 * \pre Must \b not specify a capacity<> argument 247 * */ 248 template <typename U> queue(size_type n,typename boost::allocator_rebind<node_allocator,U>::type const & alloc)249 queue(size_type n, typename boost::allocator_rebind<node_allocator, U>::type const & alloc): 250 head_(tagged_node_handle(0, 0)), 251 tail_(tagged_node_handle(0, 0)), 252 pool(alloc, n + 1) 253 { 254 BOOST_STATIC_ASSERT(!has_capacity); 255 initialize(); 256 } 257 258 /** \copydoc boost::lockfree::stack::reserve 259 * */ reserve(size_type n)260 void reserve(size_type n) 261 { 262 pool.template reserve<true>(n); 263 } 264 265 /** \copydoc boost::lockfree::stack::reserve_unsafe 266 * */ reserve_unsafe(size_type n)267 void reserve_unsafe(size_type n) 268 { 269 pool.template reserve<false>(n); 270 } 271 272 /** Destroys queue, free all nodes from freelist. 273 * */ ~queue(void)274 ~queue(void) 275 { 276 T dummy; 277 while(unsynchronized_pop(dummy)) 278 {} 279 280 pool.template destruct<false>(head_.load(memory_order_relaxed)); 281 } 282 283 /** Check if the queue is empty 284 * 285 * \return true, if the queue is empty, false otherwise 286 * \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this 287 * value in program logic. 288 * */ empty(void) const289 bool empty(void) const 290 { 291 return pool.get_handle(head_.load()) == pool.get_handle(tail_.load()); 292 } 293 294 /** Pushes object t to the queue. 295 * 296 * \post object will be pushed to the queue, if internal node can be allocated 297 * \returns true, if the push operation is successful. 298 * 299 * \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated 300 * from the OS. This may not be lock-free. 301 * */ push(T const & t)302 bool push(T const & t) 303 { 304 return do_push<false>(t); 305 } 306 307 /** Pushes object t to the queue. 308 * 309 * \post object will be pushed to the queue, if internal node can be allocated 310 * \returns true, if the push operation is successful. 311 * 312 * \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail 313 * \throws if memory allocator throws 314 * */ bounded_push(T const & t)315 bool bounded_push(T const & t) 316 { 317 return do_push<true>(t); 318 } 319 320 321 private: 322 #ifndef BOOST_DOXYGEN_INVOKED 323 template <bool Bounded> do_push(T const & t)324 bool do_push(T const & t) 325 { 326 node * n = pool.template construct<true, Bounded>(t, pool.null_handle()); 327 handle_type node_handle = pool.get_handle(n); 328 329 if (n == NULL) 330 return false; 331 332 for (;;) { 333 tagged_node_handle tail = tail_.load(memory_order_acquire); 334 node * tail_node = pool.get_pointer(tail); 335 tagged_node_handle next = tail_node->next.load(memory_order_acquire); 336 node * next_ptr = pool.get_pointer(next); 337 338 tagged_node_handle tail2 = tail_.load(memory_order_acquire); 339 if (BOOST_LIKELY(tail == tail2)) { 340 if (next_ptr == 0) { 341 tagged_node_handle new_tail_next(node_handle, next.get_next_tag()); 342 if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) { 343 tagged_node_handle new_tail(node_handle, tail.get_next_tag()); 344 tail_.compare_exchange_strong(tail, new_tail); 345 return true; 346 } 347 } 348 else { 349 tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag()); 350 tail_.compare_exchange_strong(tail, new_tail); 351 } 352 } 353 } 354 } 355 #endif 356 357 public: 358 359 /** Pushes object t to the queue. 360 * 361 * \post object will be pushed to the queue, if internal node can be allocated 362 * \returns true, if the push operation is successful. 363 * 364 * \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated 365 * from the OS. This may not be lock-free. 366 * \throws if memory allocator throws 367 * */ unsynchronized_push(T const & t)368 bool unsynchronized_push(T const & t) 369 { 370 node * n = pool.template construct<false, false>(t, pool.null_handle()); 371 372 if (n == NULL) 373 return false; 374 375 for (;;) { 376 tagged_node_handle tail = tail_.load(memory_order_relaxed); 377 tagged_node_handle next = tail->next.load(memory_order_relaxed); 378 node * next_ptr = next.get_ptr(); 379 380 if (next_ptr == 0) { 381 tail->next.store(tagged_node_handle(n, next.get_next_tag()), memory_order_relaxed); 382 tail_.store(tagged_node_handle(n, tail.get_next_tag()), memory_order_relaxed); 383 return true; 384 } 385 else 386 tail_.store(tagged_node_handle(next_ptr, tail.get_next_tag()), memory_order_relaxed); 387 } 388 } 389 390 /** Pops object from queue. 391 * 392 * \post if pop operation is successful, object will be copied to ret. 393 * \returns true, if the pop operation is successful, false if queue was empty. 394 * 395 * \note Thread-safe and non-blocking 396 * */ pop(T & ret)397 bool pop (T & ret) 398 { 399 return pop<T>(ret); 400 } 401 402 /** Pops object from queue. 403 * 404 * \pre type U must be constructible by T and copyable, or T must be convertible to U 405 * \post if pop operation is successful, object will be copied to ret. 406 * \returns true, if the pop operation is successful, false if queue was empty. 407 * 408 * \note Thread-safe and non-blocking 409 * */ 410 template <typename U> pop(U & ret)411 bool pop (U & ret) 412 { 413 for (;;) { 414 tagged_node_handle head = head_.load(memory_order_acquire); 415 node * head_ptr = pool.get_pointer(head); 416 417 tagged_node_handle tail = tail_.load(memory_order_acquire); 418 tagged_node_handle next = head_ptr->next.load(memory_order_acquire); 419 node * next_ptr = pool.get_pointer(next); 420 421 tagged_node_handle head2 = head_.load(memory_order_acquire); 422 if (BOOST_LIKELY(head == head2)) { 423 if (pool.get_handle(head) == pool.get_handle(tail)) { 424 if (next_ptr == 0) 425 return false; 426 427 tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag()); 428 tail_.compare_exchange_strong(tail, new_tail); 429 430 } else { 431 if (next_ptr == 0) 432 /* this check is not part of the original algorithm as published by michael and scott 433 * 434 * however we reuse the tagged_ptr part for the freelist and clear the next part during node 435 * allocation. we can observe a null-pointer here. 436 * */ 437 continue; 438 detail::copy_payload(next_ptr->data, ret); 439 440 tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag()); 441 if (head_.compare_exchange_weak(head, new_head)) { 442 pool.template destruct<true>(head); 443 return true; 444 } 445 } 446 } 447 } 448 } 449 450 /** Pops object from queue. 451 * 452 * \post if pop operation is successful, object will be copied to ret. 453 * \returns true, if the pop operation is successful, false if queue was empty. 454 * 455 * \note Not thread-safe, but non-blocking 456 * 457 * */ unsynchronized_pop(T & ret)458 bool unsynchronized_pop (T & ret) 459 { 460 return unsynchronized_pop<T>(ret); 461 } 462 463 /** Pops object from queue. 464 * 465 * \pre type U must be constructible by T and copyable, or T must be convertible to U 466 * \post if pop operation is successful, object will be copied to ret. 467 * \returns true, if the pop operation is successful, false if queue was empty. 468 * 469 * \note Not thread-safe, but non-blocking 470 * 471 * */ 472 template <typename U> unsynchronized_pop(U & ret)473 bool unsynchronized_pop (U & ret) 474 { 475 for (;;) { 476 tagged_node_handle head = head_.load(memory_order_relaxed); 477 node * head_ptr = pool.get_pointer(head); 478 tagged_node_handle tail = tail_.load(memory_order_relaxed); 479 tagged_node_handle next = head_ptr->next.load(memory_order_relaxed); 480 node * next_ptr = pool.get_pointer(next); 481 482 if (pool.get_handle(head) == pool.get_handle(tail)) { 483 if (next_ptr == 0) 484 return false; 485 486 tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag()); 487 tail_.store(new_tail); 488 } else { 489 if (next_ptr == 0) 490 /* this check is not part of the original algorithm as published by michael and scott 491 * 492 * however we reuse the tagged_ptr part for the freelist and clear the next part during node 493 * allocation. we can observe a null-pointer here. 494 * */ 495 continue; 496 detail::copy_payload(next_ptr->data, ret); 497 tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag()); 498 head_.store(new_head); 499 pool.template destruct<false>(head); 500 return true; 501 } 502 } 503 } 504 505 /** consumes one element via a functor 506 * 507 * pops one element from the queue and applies the functor on this object 508 * 509 * \returns true, if one element was consumed 510 * 511 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 512 * */ 513 template <typename Functor> consume_one(Functor & f)514 bool consume_one(Functor & f) 515 { 516 T element; 517 bool success = pop(element); 518 if (success) 519 f(element); 520 521 return success; 522 } 523 524 /// \copydoc boost::lockfree::queue::consume_one(Functor & rhs) 525 template <typename Functor> consume_one(Functor const & f)526 bool consume_one(Functor const & f) 527 { 528 T element; 529 bool success = pop(element); 530 if (success) 531 f(element); 532 533 return success; 534 } 535 536 /** consumes all elements via a functor 537 * 538 * sequentially pops all elements from the queue and applies the functor on each object 539 * 540 * \returns number of elements that are consumed 541 * 542 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 543 * */ 544 template <typename Functor> consume_all(Functor & f)545 size_t consume_all(Functor & f) 546 { 547 size_t element_count = 0; 548 while (consume_one(f)) 549 element_count += 1; 550 551 return element_count; 552 } 553 554 /// \copydoc boost::lockfree::queue::consume_all(Functor & rhs) 555 template <typename Functor> consume_all(Functor const & f)556 size_t consume_all(Functor const & f) 557 { 558 size_t element_count = 0; 559 while (consume_one(f)) 560 element_count += 1; 561 562 return element_count; 563 } 564 565 private: 566 #ifndef BOOST_DOXYGEN_INVOKED 567 atomic<tagged_node_handle> head_; 568 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle); 569 char padding1[padding_size]; 570 atomic<tagged_node_handle> tail_; 571 char padding2[padding_size]; 572 573 pool_t pool; 574 #endif 575 }; 576 577 } /* namespace lockfree */ 578 } /* namespace boost */ 579 580 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000) 581 #pragma warning(pop) 582 #endif 583 584 #if defined(_MSC_VER) 585 #pragma warning(pop) 586 #endif 587 588 #endif /* BOOST_LOCKFREE_FIFO_HPP_INCLUDED */ 589