1 // lock-free single-producer/single-consumer ringbuffer 2 // this algorithm is implemented in various projects (linux kernel) 3 // 4 // Copyright (C) 2009-2013 Tim Blechmann 5 // 6 // Distributed under the Boost Software License, Version 1.0. (See 7 // accompanying file LICENSE_1_0.txt or copy at 8 // http://www.boost.org/LICENSE_1_0.txt) 9 10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED 11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED 12 13 #include <algorithm> 14 #include <memory> 15 16 #include <boost/aligned_storage.hpp> 17 #include <boost/assert.hpp> 18 #include <boost/static_assert.hpp> 19 #include <boost/core/allocator_access.hpp> 20 #include <boost/utility.hpp> 21 #include <boost/next_prior.hpp> 22 #include <boost/utility/enable_if.hpp> 23 #include <boost/config.hpp> // for BOOST_LIKELY 24 25 #include <boost/type_traits/has_trivial_destructor.hpp> 26 #include <boost/type_traits/is_convertible.hpp> 27 28 #include <boost/lockfree/detail/atomic.hpp> 29 #include <boost/lockfree/detail/copy_payload.hpp> 30 #include <boost/lockfree/detail/parameter.hpp> 31 #include <boost/lockfree/detail/prefix.hpp> 32 33 #include <boost/lockfree/lockfree_forward.hpp> 34 35 #ifdef BOOST_HAS_PRAGMA_ONCE 36 #pragma once 37 #endif 38 39 namespace boost { 40 namespace lockfree { 41 namespace detail { 42 43 typedef parameter::parameters<boost::parameter::optional<tag::capacity>, 44 boost::parameter::optional<tag::allocator> 45 > ringbuffer_signature; 46 47 template <typename T> 48 class ringbuffer_base 49 { 50 #ifndef BOOST_DOXYGEN_INVOKED 51 protected: 52 typedef std::size_t size_t; 53 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t); 54 atomic<size_t> write_index_; 55 char padding1[padding_size]; /* force read_index and write_index to different cache lines */ 56 atomic<size_t> read_index_; 57 58 BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&)) 59 BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&)) 60 61 protected: ringbuffer_base(void)62 ringbuffer_base(void): 63 write_index_(0), read_index_(0) 64 {} 65 next_index(size_t arg,size_t max_size)66 static size_t next_index(size_t arg, size_t max_size) 67 { 68 size_t ret = arg + 1; 69 while (BOOST_UNLIKELY(ret >= max_size)) 70 ret -= max_size; 71 return ret; 72 } 73 read_available(size_t write_index,size_t read_index,size_t max_size)74 static size_t read_available(size_t write_index, size_t read_index, size_t max_size) 75 { 76 if (write_index >= read_index) 77 return write_index - read_index; 78 79 const size_t ret = write_index + max_size - read_index; 80 return ret; 81 } 82 write_available(size_t write_index,size_t read_index,size_t max_size)83 static size_t write_available(size_t write_index, size_t read_index, size_t max_size) 84 { 85 size_t ret = read_index - write_index - 1; 86 if (write_index >= read_index) 87 ret += max_size; 88 return ret; 89 } 90 read_available(size_t max_size) const91 size_t read_available(size_t max_size) const 92 { 93 size_t write_index = write_index_.load(memory_order_acquire); 94 const size_t read_index = read_index_.load(memory_order_relaxed); 95 return read_available(write_index, read_index, max_size); 96 } 97 write_available(size_t max_size) const98 size_t write_available(size_t max_size) const 99 { 100 size_t write_index = write_index_.load(memory_order_relaxed); 101 const size_t read_index = read_index_.load(memory_order_acquire); 102 return write_available(write_index, read_index, max_size); 103 } 104 push(T const & t,T * buffer,size_t max_size)105 bool push(T const & t, T * buffer, size_t max_size) 106 { 107 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread 108 const size_t next = next_index(write_index, max_size); 109 110 if (next == read_index_.load(memory_order_acquire)) 111 return false; /* ringbuffer is full */ 112 113 new (buffer + write_index) T(t); // copy-construct 114 115 write_index_.store(next, memory_order_release); 116 117 return true; 118 } 119 push(const T * input_buffer,size_t input_count,T * internal_buffer,size_t max_size)120 size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size) 121 { 122 return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer; 123 } 124 125 template <typename ConstIterator> push(ConstIterator begin,ConstIterator end,T * internal_buffer,size_t max_size)126 ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size) 127 { 128 // FIXME: avoid std::distance 129 130 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread 131 const size_t read_index = read_index_.load(memory_order_acquire); 132 const size_t avail = write_available(write_index, read_index, max_size); 133 134 if (avail == 0) 135 return begin; 136 137 size_t input_count = std::distance(begin, end); 138 input_count = (std::min)(input_count, avail); 139 140 size_t new_write_index = write_index + input_count; 141 142 const ConstIterator last = boost::next(begin, input_count); 143 144 if (write_index + input_count > max_size) { 145 /* copy data in two sections */ 146 const size_t count0 = max_size - write_index; 147 const ConstIterator midpoint = boost::next(begin, count0); 148 149 std::uninitialized_copy(begin, midpoint, internal_buffer + write_index); 150 std::uninitialized_copy(midpoint, last, internal_buffer); 151 new_write_index -= max_size; 152 } else { 153 std::uninitialized_copy(begin, last, internal_buffer + write_index); 154 155 if (new_write_index == max_size) 156 new_write_index = 0; 157 } 158 159 write_index_.store(new_write_index, memory_order_release); 160 return last; 161 } 162 163 template <typename Functor> consume_one(Functor & functor,T * buffer,size_t max_size)164 bool consume_one(Functor & functor, T * buffer, size_t max_size) 165 { 166 const size_t write_index = write_index_.load(memory_order_acquire); 167 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 168 if ( empty(write_index, read_index) ) 169 return false; 170 171 T & object_to_consume = buffer[read_index]; 172 functor( object_to_consume ); 173 object_to_consume.~T(); 174 175 size_t next = next_index(read_index, max_size); 176 read_index_.store(next, memory_order_release); 177 return true; 178 } 179 180 template <typename Functor> consume_one(Functor const & functor,T * buffer,size_t max_size)181 bool consume_one(Functor const & functor, T * buffer, size_t max_size) 182 { 183 const size_t write_index = write_index_.load(memory_order_acquire); 184 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 185 if ( empty(write_index, read_index) ) 186 return false; 187 188 T & object_to_consume = buffer[read_index]; 189 functor( object_to_consume ); 190 object_to_consume.~T(); 191 192 size_t next = next_index(read_index, max_size); 193 read_index_.store(next, memory_order_release); 194 return true; 195 } 196 197 template <typename Functor> consume_all(Functor const & functor,T * internal_buffer,size_t max_size)198 size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size) 199 { 200 const size_t write_index = write_index_.load(memory_order_acquire); 201 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 202 203 const size_t avail = read_available(write_index, read_index, max_size); 204 205 if (avail == 0) 206 return 0; 207 208 const size_t output_count = avail; 209 210 size_t new_read_index = read_index + output_count; 211 212 if (read_index + output_count > max_size) { 213 /* copy data in two sections */ 214 const size_t count0 = max_size - read_index; 215 const size_t count1 = output_count - count0; 216 217 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor); 218 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor); 219 220 new_read_index -= max_size; 221 } else { 222 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor); 223 224 if (new_read_index == max_size) 225 new_read_index = 0; 226 } 227 228 read_index_.store(new_read_index, memory_order_release); 229 return output_count; 230 } 231 232 template <typename Functor> consume_all(Functor & functor,T * internal_buffer,size_t max_size)233 size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size) 234 { 235 const size_t write_index = write_index_.load(memory_order_acquire); 236 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 237 238 const size_t avail = read_available(write_index, read_index, max_size); 239 240 if (avail == 0) 241 return 0; 242 243 const size_t output_count = avail; 244 245 size_t new_read_index = read_index + output_count; 246 247 if (read_index + output_count > max_size) { 248 /* copy data in two sections */ 249 const size_t count0 = max_size - read_index; 250 const size_t count1 = output_count - count0; 251 252 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor); 253 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor); 254 255 new_read_index -= max_size; 256 } else { 257 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor); 258 259 if (new_read_index == max_size) 260 new_read_index = 0; 261 } 262 263 read_index_.store(new_read_index, memory_order_release); 264 return output_count; 265 } 266 pop(T * output_buffer,size_t output_count,T * internal_buffer,size_t max_size)267 size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size) 268 { 269 const size_t write_index = write_index_.load(memory_order_acquire); 270 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 271 272 const size_t avail = read_available(write_index, read_index, max_size); 273 274 if (avail == 0) 275 return 0; 276 277 output_count = (std::min)(output_count, avail); 278 279 size_t new_read_index = read_index + output_count; 280 281 if (read_index + output_count > max_size) { 282 /* copy data in two sections */ 283 const size_t count0 = max_size - read_index; 284 const size_t count1 = output_count - count0; 285 286 copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer); 287 copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0); 288 289 new_read_index -= max_size; 290 } else { 291 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer); 292 if (new_read_index == max_size) 293 new_read_index = 0; 294 } 295 296 read_index_.store(new_read_index, memory_order_release); 297 return output_count; 298 } 299 300 template <typename OutputIterator> pop_to_output_iterator(OutputIterator it,T * internal_buffer,size_t max_size)301 size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size) 302 { 303 const size_t write_index = write_index_.load(memory_order_acquire); 304 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 305 306 const size_t avail = read_available(write_index, read_index, max_size); 307 if (avail == 0) 308 return 0; 309 310 size_t new_read_index = read_index + avail; 311 312 if (read_index + avail > max_size) { 313 /* copy data in two sections */ 314 const size_t count0 = max_size - read_index; 315 const size_t count1 = avail - count0; 316 317 it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it); 318 copy_and_delete(internal_buffer, internal_buffer + count1, it); 319 320 new_read_index -= max_size; 321 } else { 322 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it); 323 if (new_read_index == max_size) 324 new_read_index = 0; 325 } 326 327 read_index_.store(new_read_index, memory_order_release); 328 return avail; 329 } 330 front(const T * internal_buffer) const331 const T& front(const T * internal_buffer) const 332 { 333 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 334 return *(internal_buffer + read_index); 335 } 336 front(T * internal_buffer)337 T& front(T * internal_buffer) 338 { 339 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 340 return *(internal_buffer + read_index); 341 } 342 #endif 343 344 345 public: 346 /** reset the ringbuffer 347 * 348 * \note Not thread-safe 349 * */ reset(void)350 void reset(void) 351 { 352 if ( !boost::has_trivial_destructor<T>::value ) { 353 // make sure to call all destructors! 354 355 T dummy_element; 356 while (pop(dummy_element)) 357 {} 358 } else { 359 write_index_.store(0, memory_order_relaxed); 360 read_index_.store(0, memory_order_release); 361 } 362 } 363 364 /** Check if the ringbuffer is empty 365 * 366 * \return true, if the ringbuffer is empty, false otherwise 367 * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate. 368 * */ empty(void)369 bool empty(void) 370 { 371 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed)); 372 } 373 374 /** 375 * \return true, if implementation is lock-free. 376 * 377 * */ is_lock_free(void) const378 bool is_lock_free(void) const 379 { 380 return write_index_.is_lock_free() && read_index_.is_lock_free(); 381 } 382 383 private: empty(size_t write_index,size_t read_index)384 bool empty(size_t write_index, size_t read_index) 385 { 386 return write_index == read_index; 387 } 388 389 template< class OutputIterator > copy_and_delete(T * first,T * last,OutputIterator out)390 OutputIterator copy_and_delete( T * first, T * last, OutputIterator out ) 391 { 392 if (boost::has_trivial_destructor<T>::value) { 393 return std::copy(first, last, out); // will use memcpy if possible 394 } else { 395 for (; first != last; ++first, ++out) { 396 *out = *first; 397 first->~T(); 398 } 399 return out; 400 } 401 } 402 403 template< class Functor > run_functor_and_delete(T * first,T * last,Functor & functor)404 void run_functor_and_delete( T * first, T * last, Functor & functor ) 405 { 406 for (; first != last; ++first) { 407 functor(*first); 408 first->~T(); 409 } 410 } 411 412 template< class Functor > run_functor_and_delete(T * first,T * last,Functor const & functor)413 void run_functor_and_delete( T * first, T * last, Functor const & functor ) 414 { 415 for (; first != last; ++first) { 416 functor(*first); 417 first->~T(); 418 } 419 } 420 }; 421 422 template <typename T, std::size_t MaxSize> 423 class compile_time_sized_ringbuffer: 424 public ringbuffer_base<T> 425 { 426 typedef std::size_t size_type; 427 static const std::size_t max_size = MaxSize + 1; 428 429 typedef typename boost::aligned_storage<max_size * sizeof(T), 430 boost::alignment_of<T>::value 431 >::type storage_type; 432 433 storage_type storage_; 434 data()435 T * data() 436 { 437 return static_cast<T*>(storage_.address()); 438 } 439 data() const440 const T * data() const 441 { 442 return static_cast<const T*>(storage_.address()); 443 } 444 445 protected: max_number_of_elements() const446 size_type max_number_of_elements() const 447 { 448 return max_size; 449 } 450 451 public: push(T const & t)452 bool push(T const & t) 453 { 454 return ringbuffer_base<T>::push(t, data(), max_size); 455 } 456 457 template <typename Functor> consume_one(Functor & f)458 bool consume_one(Functor & f) 459 { 460 return ringbuffer_base<T>::consume_one(f, data(), max_size); 461 } 462 463 template <typename Functor> consume_one(Functor const & f)464 bool consume_one(Functor const & f) 465 { 466 return ringbuffer_base<T>::consume_one(f, data(), max_size); 467 } 468 469 template <typename Functor> consume_all(Functor & f)470 size_type consume_all(Functor & f) 471 { 472 return ringbuffer_base<T>::consume_all(f, data(), max_size); 473 } 474 475 template <typename Functor> consume_all(Functor const & f)476 size_type consume_all(Functor const & f) 477 { 478 return ringbuffer_base<T>::consume_all(f, data(), max_size); 479 } 480 push(T const * t,size_type size)481 size_type push(T const * t, size_type size) 482 { 483 return ringbuffer_base<T>::push(t, size, data(), max_size); 484 } 485 486 template <size_type size> push(T const (& t)[size])487 size_type push(T const (&t)[size]) 488 { 489 return push(t, size); 490 } 491 492 template <typename ConstIterator> push(ConstIterator begin,ConstIterator end)493 ConstIterator push(ConstIterator begin, ConstIterator end) 494 { 495 return ringbuffer_base<T>::push(begin, end, data(), max_size); 496 } 497 pop(T * ret,size_type size)498 size_type pop(T * ret, size_type size) 499 { 500 return ringbuffer_base<T>::pop(ret, size, data(), max_size); 501 } 502 503 template <typename OutputIterator> pop_to_output_iterator(OutputIterator it)504 size_type pop_to_output_iterator(OutputIterator it) 505 { 506 return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size); 507 } 508 front(void) const509 const T& front(void) const 510 { 511 return ringbuffer_base<T>::front(data()); 512 } 513 front(void)514 T& front(void) 515 { 516 return ringbuffer_base<T>::front(data()); 517 } 518 }; 519 520 template <typename T, typename Alloc> 521 class runtime_sized_ringbuffer: 522 public ringbuffer_base<T>, 523 private Alloc 524 { 525 typedef std::size_t size_type; 526 size_type max_elements_; 527 #ifdef BOOST_NO_CXX11_ALLOCATOR 528 typedef typename Alloc::pointer pointer; 529 #else 530 typedef std::allocator_traits<Alloc> allocator_traits; 531 typedef typename allocator_traits::pointer pointer; 532 #endif 533 pointer array_; 534 535 protected: max_number_of_elements() const536 size_type max_number_of_elements() const 537 { 538 return max_elements_; 539 } 540 541 public: runtime_sized_ringbuffer(size_type max_elements)542 explicit runtime_sized_ringbuffer(size_type max_elements): 543 max_elements_(max_elements + 1) 544 { 545 #ifdef BOOST_NO_CXX11_ALLOCATOR 546 array_ = Alloc::allocate(max_elements_); 547 #else 548 Alloc& alloc = *this; 549 array_ = allocator_traits::allocate(alloc, max_elements_); 550 #endif 551 } 552 553 template <typename U> runtime_sized_ringbuffer(typename boost::allocator_rebind<Alloc,U>::type const & alloc,size_type max_elements)554 runtime_sized_ringbuffer(typename boost::allocator_rebind<Alloc, U>::type const & alloc, size_type max_elements): 555 Alloc(alloc), max_elements_(max_elements + 1) 556 { 557 #ifdef BOOST_NO_CXX11_ALLOCATOR 558 array_ = Alloc::allocate(max_elements_); 559 #else 560 Alloc& allocator = *this; 561 array_ = allocator_traits::allocate(allocator, max_elements_); 562 #endif 563 } 564 runtime_sized_ringbuffer(Alloc const & alloc,size_type max_elements)565 runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements): 566 Alloc(alloc), max_elements_(max_elements + 1) 567 { 568 #ifdef BOOST_NO_CXX11_ALLOCATOR 569 array_ = Alloc::allocate(max_elements_); 570 #else 571 Alloc& allocator = *this; 572 array_ = allocator_traits::allocate(allocator, max_elements_); 573 #endif 574 } 575 ~runtime_sized_ringbuffer(void)576 ~runtime_sized_ringbuffer(void) 577 { 578 // destroy all remaining items 579 T out; 580 while (pop(&out, 1)) {} 581 582 #ifdef BOOST_NO_CXX11_ALLOCATOR 583 Alloc::deallocate(array_, max_elements_); 584 #else 585 Alloc& allocator = *this; 586 allocator_traits::deallocate(allocator, array_, max_elements_); 587 #endif 588 } 589 push(T const & t)590 bool push(T const & t) 591 { 592 return ringbuffer_base<T>::push(t, &*array_, max_elements_); 593 } 594 595 template <typename Functor> consume_one(Functor & f)596 bool consume_one(Functor & f) 597 { 598 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_); 599 } 600 601 template <typename Functor> consume_one(Functor const & f)602 bool consume_one(Functor const & f) 603 { 604 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_); 605 } 606 607 template <typename Functor> consume_all(Functor & f)608 size_type consume_all(Functor & f) 609 { 610 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_); 611 } 612 613 template <typename Functor> consume_all(Functor const & f)614 size_type consume_all(Functor const & f) 615 { 616 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_); 617 } 618 push(T const * t,size_type size)619 size_type push(T const * t, size_type size) 620 { 621 return ringbuffer_base<T>::push(t, size, &*array_, max_elements_); 622 } 623 624 template <size_type size> push(T const (& t)[size])625 size_type push(T const (&t)[size]) 626 { 627 return push(t, size); 628 } 629 630 template <typename ConstIterator> push(ConstIterator begin,ConstIterator end)631 ConstIterator push(ConstIterator begin, ConstIterator end) 632 { 633 return ringbuffer_base<T>::push(begin, end, &*array_, max_elements_); 634 } 635 pop(T * ret,size_type size)636 size_type pop(T * ret, size_type size) 637 { 638 return ringbuffer_base<T>::pop(ret, size, &*array_, max_elements_); 639 } 640 641 template <typename OutputIterator> pop_to_output_iterator(OutputIterator it)642 size_type pop_to_output_iterator(OutputIterator it) 643 { 644 return ringbuffer_base<T>::pop_to_output_iterator(it, &*array_, max_elements_); 645 } 646 front(void) const647 const T& front(void) const 648 { 649 return ringbuffer_base<T>::front(&*array_); 650 } 651 front(void)652 T& front(void) 653 { 654 return ringbuffer_base<T>::front(&*array_); 655 } 656 }; 657 658 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 659 template <typename T, typename A0, typename A1> 660 #else 661 template <typename T, typename ...Options> 662 #endif 663 struct make_ringbuffer 664 { 665 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 666 typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args; 667 #else 668 typedef typename ringbuffer_signature::bind<Options...>::type bound_args; 669 #endif 670 671 typedef extract_capacity<bound_args> extract_capacity_t; 672 673 static const bool runtime_sized = !extract_capacity_t::has_capacity; 674 static const size_t capacity = extract_capacity_t::capacity; 675 676 typedef extract_allocator<bound_args, T> extract_allocator_t; 677 typedef typename extract_allocator_t::type allocator; 678 679 // allocator argument is only sane, for run-time sized ringbuffers 680 BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>, 681 mpl::bool_<!extract_allocator_t::has_allocator>, 682 mpl::true_ 683 >::type::value)); 684 685 typedef typename mpl::if_c<runtime_sized, 686 runtime_sized_ringbuffer<T, allocator>, 687 compile_time_sized_ringbuffer<T, capacity> 688 >::type ringbuffer_type; 689 }; 690 691 692 } /* namespace detail */ 693 694 695 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free. 696 * 697 * \b Policies: 698 * - \c boost::lockfree::capacity<>, optional <br> 699 * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time. 700 * 701 * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br> 702 * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured 703 * to be sized at run-time 704 * 705 * \b Requirements: 706 * - T must have a default constructor 707 * - T must be copyable 708 * */ 709 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 710 template <typename T, class A0, class A1> 711 #else 712 template <typename T, typename ...Options> 713 #endif 714 class spsc_queue: 715 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 716 public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type 717 #else 718 public detail::make_ringbuffer<T, Options...>::ringbuffer_type 719 #endif 720 { 721 private: 722 723 #ifndef BOOST_DOXYGEN_INVOKED 724 725 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES 726 typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type; 727 static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized; 728 typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg; 729 #else 730 typedef typename detail::make_ringbuffer<T, Options...>::ringbuffer_type base_type; 731 static const bool runtime_sized = detail::make_ringbuffer<T, Options...>::runtime_sized; 732 typedef typename detail::make_ringbuffer<T, Options...>::allocator allocator_arg; 733 #endif 734 735 736 struct implementation_defined 737 { 738 typedef allocator_arg allocator; 739 typedef std::size_t size_type; 740 }; 741 #endif 742 743 public: 744 typedef T value_type; 745 typedef typename implementation_defined::allocator allocator; 746 typedef typename implementation_defined::size_type size_type; 747 748 /** Constructs a spsc_queue 749 * 750 * \pre spsc_queue must be configured to be sized at compile-time 751 */ spsc_queue(void)752 spsc_queue(void) 753 { 754 // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling 755 // this function and this function may be compiled even when it isn't being used. 756 BOOST_ASSERT(!runtime_sized); 757 } 758 759 /** Constructs a spsc_queue with a custom allocator 760 * 761 * \pre spsc_queue must be configured to be sized at compile-time 762 * 763 * \note This is just for API compatibility: an allocator isn't actually needed 764 */ 765 template <typename U> spsc_queue(typename boost::allocator_rebind<allocator,U>::type const &)766 explicit spsc_queue(typename boost::allocator_rebind<allocator, U>::type const &) 767 { 768 BOOST_STATIC_ASSERT(!runtime_sized); 769 } 770 771 /** Constructs a spsc_queue with a custom allocator 772 * 773 * \pre spsc_queue must be configured to be sized at compile-time 774 * 775 * \note This is just for API compatibility: an allocator isn't actually needed 776 */ spsc_queue(allocator const &)777 explicit spsc_queue(allocator const &) 778 { 779 // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling 780 // this function and this function may be compiled even when it isn't being used. 781 BOOST_ASSERT(!runtime_sized); 782 } 783 784 /** Constructs a spsc_queue for element_count elements 785 * 786 * \pre spsc_queue must be configured to be sized at run-time 787 */ spsc_queue(size_type element_count)788 explicit spsc_queue(size_type element_count): 789 base_type(element_count) 790 { 791 // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling 792 // this function and this function may be compiled even when it isn't being used. 793 BOOST_ASSERT(runtime_sized); 794 } 795 796 /** Constructs a spsc_queue for element_count elements with a custom allocator 797 * 798 * \pre spsc_queue must be configured to be sized at run-time 799 */ 800 template <typename U> spsc_queue(size_type element_count,typename boost::allocator_rebind<allocator,U>::type const & alloc)801 spsc_queue(size_type element_count, typename boost::allocator_rebind<allocator, U>::type const & alloc): 802 base_type(alloc, element_count) 803 { 804 BOOST_STATIC_ASSERT(runtime_sized); 805 } 806 807 /** Constructs a spsc_queue for element_count elements with a custom allocator 808 * 809 * \pre spsc_queue must be configured to be sized at run-time 810 */ spsc_queue(size_type element_count,allocator_arg const & alloc)811 spsc_queue(size_type element_count, allocator_arg const & alloc): 812 base_type(alloc, element_count) 813 { 814 // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling 815 // this function and this function may be compiled even when it isn't being used. 816 BOOST_ASSERT(runtime_sized); 817 } 818 819 /** Pushes object t to the ringbuffer. 820 * 821 * \pre only one thread is allowed to push data to the spsc_queue 822 * \post object will be pushed to the spsc_queue, unless it is full. 823 * \return true, if the push operation is successful. 824 * 825 * \note Thread-safe and wait-free 826 * */ push(T const & t)827 bool push(T const & t) 828 { 829 return base_type::push(t); 830 } 831 832 /** Pops one object from ringbuffer. 833 * 834 * \pre only one thread is allowed to pop data to the spsc_queue 835 * \post if ringbuffer is not empty, object will be discarded. 836 * \return true, if the pop operation is successful, false if ringbuffer was empty. 837 * 838 * \note Thread-safe and wait-free 839 */ pop()840 bool pop () 841 { 842 detail::consume_noop consume_functor; 843 return consume_one( consume_functor ); 844 } 845 846 /** Pops one object from ringbuffer. 847 * 848 * \pre only one thread is allowed to pop data to the spsc_queue 849 * \post if ringbuffer is not empty, object will be copied to ret. 850 * \return true, if the pop operation is successful, false if ringbuffer was empty. 851 * 852 * \note Thread-safe and wait-free 853 */ 854 template <typename U> 855 typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type pop(U & ret)856 pop (U & ret) 857 { 858 detail::consume_via_copy<U> consume_functor(ret); 859 return consume_one( consume_functor ); 860 } 861 862 /** Pushes as many objects from the array t as there is space. 863 * 864 * \pre only one thread is allowed to push data to the spsc_queue 865 * \return number of pushed items 866 * 867 * \note Thread-safe and wait-free 868 */ push(T const * t,size_type size)869 size_type push(T const * t, size_type size) 870 { 871 return base_type::push(t, size); 872 } 873 874 /** Pushes as many objects from the array t as there is space available. 875 * 876 * \pre only one thread is allowed to push data to the spsc_queue 877 * \return number of pushed items 878 * 879 * \note Thread-safe and wait-free 880 */ 881 template <size_type size> push(T const (& t)[size])882 size_type push(T const (&t)[size]) 883 { 884 return push(t, size); 885 } 886 887 /** Pushes as many objects from the range [begin, end) as there is space . 888 * 889 * \pre only one thread is allowed to push data to the spsc_queue 890 * \return iterator to the first element, which has not been pushed 891 * 892 * \note Thread-safe and wait-free 893 */ 894 template <typename ConstIterator> push(ConstIterator begin,ConstIterator end)895 ConstIterator push(ConstIterator begin, ConstIterator end) 896 { 897 return base_type::push(begin, end); 898 } 899 900 /** Pops a maximum of size objects from ringbuffer. 901 * 902 * \pre only one thread is allowed to pop data to the spsc_queue 903 * \return number of popped items 904 * 905 * \note Thread-safe and wait-free 906 * */ pop(T * ret,size_type size)907 size_type pop(T * ret, size_type size) 908 { 909 return base_type::pop(ret, size); 910 } 911 912 /** Pops a maximum of size objects from spsc_queue. 913 * 914 * \pre only one thread is allowed to pop data to the spsc_queue 915 * \return number of popped items 916 * 917 * \note Thread-safe and wait-free 918 * */ 919 template <size_type size> pop(T (& ret)[size])920 size_type pop(T (&ret)[size]) 921 { 922 return pop(ret, size); 923 } 924 925 /** Pops objects to the output iterator it 926 * 927 * \pre only one thread is allowed to pop data to the spsc_queue 928 * \return number of popped items 929 * 930 * \note Thread-safe and wait-free 931 * */ 932 template <typename OutputIterator> 933 typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type pop(OutputIterator it)934 pop(OutputIterator it) 935 { 936 return base_type::pop_to_output_iterator(it); 937 } 938 939 /** consumes one element via a functor 940 * 941 * pops one element from the queue and applies the functor on this object 942 * 943 * \returns true, if one element was consumed 944 * 945 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 946 * */ 947 template <typename Functor> consume_one(Functor & f)948 bool consume_one(Functor & f) 949 { 950 return base_type::consume_one(f); 951 } 952 953 /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs) 954 template <typename Functor> consume_one(Functor const & f)955 bool consume_one(Functor const & f) 956 { 957 return base_type::consume_one(f); 958 } 959 960 /** consumes all elements via a functor 961 * 962 * sequentially pops all elements from the queue and applies the functor on each object 963 * 964 * \returns number of elements that are consumed 965 * 966 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 967 * */ 968 template <typename Functor> consume_all(Functor & f)969 size_type consume_all(Functor & f) 970 { 971 return base_type::consume_all(f); 972 } 973 974 /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs) 975 template <typename Functor> consume_all(Functor const & f)976 size_type consume_all(Functor const & f) 977 { 978 return base_type::consume_all(f); 979 } 980 981 /** get number of elements that are available for read 982 * 983 * \return number of available elements that can be popped from the spsc_queue 984 * 985 * \note Thread-safe and wait-free, should only be called from the consumer thread 986 * */ read_available() const987 size_type read_available() const 988 { 989 return base_type::read_available(base_type::max_number_of_elements()); 990 } 991 992 /** get write space to write elements 993 * 994 * \return number of elements that can be pushed to the spsc_queue 995 * 996 * \note Thread-safe and wait-free, should only be called from the producer thread 997 * */ write_available() const998 size_type write_available() const 999 { 1000 return base_type::write_available(base_type::max_number_of_elements()); 1001 } 1002 1003 /** get reference to element in the front of the queue 1004 * 1005 * Availability of front element can be checked using read_available(). 1006 * 1007 * \pre only a consuming thread is allowed to check front element 1008 * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method. 1009 * \return reference to the first element in the queue 1010 * 1011 * \note Thread-safe and wait-free 1012 */ front() const1013 const T& front() const 1014 { 1015 BOOST_ASSERT(read_available() > 0); 1016 return base_type::front(); 1017 } 1018 1019 /// \copydoc boost::lockfree::spsc_queue::front() const front()1020 T& front() 1021 { 1022 BOOST_ASSERT(read_available() > 0); 1023 return base_type::front(); 1024 } 1025 1026 /** reset the ringbuffer 1027 * 1028 * \note Not thread-safe 1029 * */ reset(void)1030 void reset(void) 1031 { 1032 if ( !boost::has_trivial_destructor<T>::value ) { 1033 // make sure to call all destructors! 1034 1035 T dummy_element; 1036 while (pop(dummy_element)) 1037 {} 1038 } else { 1039 base_type::write_index_.store(0, memory_order_relaxed); 1040 base_type::read_index_.store(0, memory_order_release); 1041 } 1042 } 1043 }; 1044 1045 } /* namespace lockfree */ 1046 } /* namespace boost */ 1047 1048 1049 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */ 1050