• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //  lock-free single-producer/single-consumer ringbuffer
2 //  this algorithm is implemented in various projects (linux kernel)
3 //
4 //  Copyright (C) 2009-2013 Tim Blechmann
5 //
6 //  Distributed under the Boost Software License, Version 1.0. (See
7 //  accompanying file LICENSE_1_0.txt or copy at
8 //  http://www.boost.org/LICENSE_1_0.txt)
9 
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
12 
13 #include <algorithm>
14 #include <memory>
15 
16 #include <boost/aligned_storage.hpp>
17 #include <boost/assert.hpp>
18 #include <boost/static_assert.hpp>
19 #include <boost/core/allocator_access.hpp>
20 #include <boost/utility.hpp>
21 #include <boost/next_prior.hpp>
22 #include <boost/utility/enable_if.hpp>
23 #include <boost/config.hpp> // for BOOST_LIKELY
24 
25 #include <boost/type_traits/has_trivial_destructor.hpp>
26 #include <boost/type_traits/is_convertible.hpp>
27 
28 #include <boost/lockfree/detail/atomic.hpp>
29 #include <boost/lockfree/detail/copy_payload.hpp>
30 #include <boost/lockfree/detail/parameter.hpp>
31 #include <boost/lockfree/detail/prefix.hpp>
32 
33 #include <boost/lockfree/lockfree_forward.hpp>
34 
35 #ifdef BOOST_HAS_PRAGMA_ONCE
36 #pragma once
37 #endif
38 
39 namespace boost    {
40 namespace lockfree {
41 namespace detail   {
42 
43 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
44                               boost::parameter::optional<tag::allocator>
45                              > ringbuffer_signature;
46 
47 template <typename T>
48 class ringbuffer_base
49 {
50 #ifndef BOOST_DOXYGEN_INVOKED
51 protected:
52     typedef std::size_t size_t;
53     static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
54     atomic<size_t> write_index_;
55     char padding1[padding_size]; /* force read_index and write_index to different cache lines */
56     atomic<size_t> read_index_;
57 
58     BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
59     BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
60 
61 protected:
ringbuffer_base(void)62     ringbuffer_base(void):
63         write_index_(0), read_index_(0)
64     {}
65 
next_index(size_t arg,size_t max_size)66     static size_t next_index(size_t arg, size_t max_size)
67     {
68         size_t ret = arg + 1;
69         while (BOOST_UNLIKELY(ret >= max_size))
70             ret -= max_size;
71         return ret;
72     }
73 
read_available(size_t write_index,size_t read_index,size_t max_size)74     static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
75     {
76         if (write_index >= read_index)
77             return write_index - read_index;
78 
79         const size_t ret = write_index + max_size - read_index;
80         return ret;
81     }
82 
write_available(size_t write_index,size_t read_index,size_t max_size)83     static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
84     {
85         size_t ret = read_index - write_index - 1;
86         if (write_index >= read_index)
87             ret += max_size;
88         return ret;
89     }
90 
read_available(size_t max_size) const91     size_t read_available(size_t max_size) const
92     {
93         size_t write_index = write_index_.load(memory_order_acquire);
94         const size_t read_index  = read_index_.load(memory_order_relaxed);
95         return read_available(write_index, read_index, max_size);
96     }
97 
write_available(size_t max_size) const98     size_t write_available(size_t max_size) const
99     {
100         size_t write_index = write_index_.load(memory_order_relaxed);
101         const size_t read_index  = read_index_.load(memory_order_acquire);
102         return write_available(write_index, read_index, max_size);
103     }
104 
push(T const & t,T * buffer,size_t max_size)105     bool push(T const & t, T * buffer, size_t max_size)
106     {
107         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
108         const size_t next = next_index(write_index, max_size);
109 
110         if (next == read_index_.load(memory_order_acquire))
111             return false; /* ringbuffer is full */
112 
113         new (buffer + write_index) T(t); // copy-construct
114 
115         write_index_.store(next, memory_order_release);
116 
117         return true;
118     }
119 
push(const T * input_buffer,size_t input_count,T * internal_buffer,size_t max_size)120     size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
121     {
122         return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
123     }
124 
125     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end,T * internal_buffer,size_t max_size)126     ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
127     {
128         // FIXME: avoid std::distance
129 
130         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
131         const size_t read_index  = read_index_.load(memory_order_acquire);
132         const size_t avail = write_available(write_index, read_index, max_size);
133 
134         if (avail == 0)
135             return begin;
136 
137         size_t input_count = std::distance(begin, end);
138         input_count = (std::min)(input_count, avail);
139 
140         size_t new_write_index = write_index + input_count;
141 
142         const ConstIterator last = boost::next(begin, input_count);
143 
144         if (write_index + input_count > max_size) {
145             /* copy data in two sections */
146             const size_t count0 = max_size - write_index;
147             const ConstIterator midpoint = boost::next(begin, count0);
148 
149             std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
150             std::uninitialized_copy(midpoint, last, internal_buffer);
151             new_write_index -= max_size;
152         } else {
153             std::uninitialized_copy(begin, last, internal_buffer + write_index);
154 
155             if (new_write_index == max_size)
156                 new_write_index = 0;
157         }
158 
159         write_index_.store(new_write_index, memory_order_release);
160         return last;
161     }
162 
163     template <typename Functor>
consume_one(Functor & functor,T * buffer,size_t max_size)164     bool consume_one(Functor & functor, T * buffer, size_t max_size)
165     {
166         const size_t write_index = write_index_.load(memory_order_acquire);
167         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
168         if ( empty(write_index, read_index) )
169             return false;
170 
171         T & object_to_consume = buffer[read_index];
172         functor( object_to_consume );
173         object_to_consume.~T();
174 
175         size_t next = next_index(read_index, max_size);
176         read_index_.store(next, memory_order_release);
177         return true;
178     }
179 
180     template <typename Functor>
consume_one(Functor const & functor,T * buffer,size_t max_size)181     bool consume_one(Functor const & functor, T * buffer, size_t max_size)
182     {
183         const size_t write_index = write_index_.load(memory_order_acquire);
184         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
185         if ( empty(write_index, read_index) )
186             return false;
187 
188         T & object_to_consume = buffer[read_index];
189         functor( object_to_consume );
190         object_to_consume.~T();
191 
192         size_t next = next_index(read_index, max_size);
193         read_index_.store(next, memory_order_release);
194         return true;
195     }
196 
197     template <typename Functor>
consume_all(Functor const & functor,T * internal_buffer,size_t max_size)198     size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
199     {
200         const size_t write_index = write_index_.load(memory_order_acquire);
201         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
202 
203         const size_t avail = read_available(write_index, read_index, max_size);
204 
205         if (avail == 0)
206             return 0;
207 
208         const size_t output_count = avail;
209 
210         size_t new_read_index = read_index + output_count;
211 
212         if (read_index + output_count > max_size) {
213             /* copy data in two sections */
214             const size_t count0 = max_size - read_index;
215             const size_t count1 = output_count - count0;
216 
217             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
218             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
219 
220             new_read_index -= max_size;
221         } else {
222             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
223 
224             if (new_read_index == max_size)
225                 new_read_index = 0;
226         }
227 
228         read_index_.store(new_read_index, memory_order_release);
229         return output_count;
230     }
231 
232     template <typename Functor>
consume_all(Functor & functor,T * internal_buffer,size_t max_size)233     size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
234     {
235         const size_t write_index = write_index_.load(memory_order_acquire);
236         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
237 
238         const size_t avail = read_available(write_index, read_index, max_size);
239 
240         if (avail == 0)
241             return 0;
242 
243         const size_t output_count = avail;
244 
245         size_t new_read_index = read_index + output_count;
246 
247         if (read_index + output_count > max_size) {
248             /* copy data in two sections */
249             const size_t count0 = max_size - read_index;
250             const size_t count1 = output_count - count0;
251 
252             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
253             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
254 
255             new_read_index -= max_size;
256         } else {
257             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
258 
259             if (new_read_index == max_size)
260                 new_read_index = 0;
261         }
262 
263         read_index_.store(new_read_index, memory_order_release);
264         return output_count;
265     }
266 
pop(T * output_buffer,size_t output_count,T * internal_buffer,size_t max_size)267     size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
268     {
269         const size_t write_index = write_index_.load(memory_order_acquire);
270         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
271 
272         const size_t avail = read_available(write_index, read_index, max_size);
273 
274         if (avail == 0)
275             return 0;
276 
277         output_count = (std::min)(output_count, avail);
278 
279         size_t new_read_index = read_index + output_count;
280 
281         if (read_index + output_count > max_size) {
282             /* copy data in two sections */
283             const size_t count0 = max_size - read_index;
284             const size_t count1 = output_count - count0;
285 
286             copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
287             copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
288 
289             new_read_index -= max_size;
290         } else {
291             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
292             if (new_read_index == max_size)
293                 new_read_index = 0;
294         }
295 
296         read_index_.store(new_read_index, memory_order_release);
297         return output_count;
298     }
299 
300     template <typename OutputIterator>
pop_to_output_iterator(OutputIterator it,T * internal_buffer,size_t max_size)301     size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
302     {
303         const size_t write_index = write_index_.load(memory_order_acquire);
304         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
305 
306         const size_t avail = read_available(write_index, read_index, max_size);
307         if (avail == 0)
308             return 0;
309 
310         size_t new_read_index = read_index + avail;
311 
312         if (read_index + avail > max_size) {
313             /* copy data in two sections */
314             const size_t count0 = max_size - read_index;
315             const size_t count1 = avail - count0;
316 
317             it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
318             copy_and_delete(internal_buffer, internal_buffer + count1, it);
319 
320             new_read_index -= max_size;
321         } else {
322             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
323             if (new_read_index == max_size)
324                 new_read_index = 0;
325         }
326 
327         read_index_.store(new_read_index, memory_order_release);
328         return avail;
329     }
330 
front(const T * internal_buffer) const331     const T& front(const T * internal_buffer) const
332     {
333         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
334         return *(internal_buffer + read_index);
335     }
336 
front(T * internal_buffer)337     T& front(T * internal_buffer)
338     {
339         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
340         return *(internal_buffer + read_index);
341     }
342 #endif
343 
344 
345 public:
346     /** reset the ringbuffer
347      *
348      * \note Not thread-safe
349      * */
reset(void)350     void reset(void)
351     {
352         if ( !boost::has_trivial_destructor<T>::value ) {
353             // make sure to call all destructors!
354 
355             T dummy_element;
356             while (pop(dummy_element))
357             {}
358         } else {
359             write_index_.store(0, memory_order_relaxed);
360             read_index_.store(0, memory_order_release);
361         }
362     }
363 
364     /** Check if the ringbuffer is empty
365      *
366      * \return true, if the ringbuffer is empty, false otherwise
367      * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
368      * */
empty(void)369     bool empty(void)
370     {
371         return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
372     }
373 
374     /**
375      * \return true, if implementation is lock-free.
376      *
377      * */
is_lock_free(void) const378     bool is_lock_free(void) const
379     {
380         return write_index_.is_lock_free() && read_index_.is_lock_free();
381     }
382 
383 private:
empty(size_t write_index,size_t read_index)384     bool empty(size_t write_index, size_t read_index)
385     {
386         return write_index == read_index;
387     }
388 
389     template< class OutputIterator >
copy_and_delete(T * first,T * last,OutputIterator out)390     OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
391     {
392         if (boost::has_trivial_destructor<T>::value) {
393             return std::copy(first, last, out); // will use memcpy if possible
394         } else {
395             for (; first != last; ++first, ++out) {
396                 *out = *first;
397                 first->~T();
398             }
399             return out;
400         }
401     }
402 
403     template< class Functor >
run_functor_and_delete(T * first,T * last,Functor & functor)404     void run_functor_and_delete( T * first, T * last, Functor & functor )
405     {
406         for (; first != last; ++first) {
407             functor(*first);
408             first->~T();
409         }
410     }
411 
412     template< class Functor >
run_functor_and_delete(T * first,T * last,Functor const & functor)413     void run_functor_and_delete( T * first, T * last, Functor const & functor )
414     {
415         for (; first != last; ++first) {
416             functor(*first);
417             first->~T();
418         }
419     }
420 };
421 
422 template <typename T, std::size_t MaxSize>
423 class compile_time_sized_ringbuffer:
424     public ringbuffer_base<T>
425 {
426     typedef std::size_t size_type;
427     static const std::size_t max_size = MaxSize + 1;
428 
429     typedef typename boost::aligned_storage<max_size * sizeof(T),
430                                             boost::alignment_of<T>::value
431                                            >::type storage_type;
432 
433     storage_type storage_;
434 
data()435     T * data()
436     {
437         return static_cast<T*>(storage_.address());
438     }
439 
data() const440     const T * data() const
441     {
442         return static_cast<const T*>(storage_.address());
443     }
444 
445 protected:
max_number_of_elements() const446     size_type max_number_of_elements() const
447     {
448         return max_size;
449     }
450 
451 public:
push(T const & t)452     bool push(T const & t)
453     {
454         return ringbuffer_base<T>::push(t, data(), max_size);
455     }
456 
457     template <typename Functor>
consume_one(Functor & f)458     bool consume_one(Functor & f)
459     {
460         return ringbuffer_base<T>::consume_one(f, data(), max_size);
461     }
462 
463     template <typename Functor>
consume_one(Functor const & f)464     bool consume_one(Functor const & f)
465     {
466         return ringbuffer_base<T>::consume_one(f, data(), max_size);
467     }
468 
469     template <typename Functor>
consume_all(Functor & f)470     size_type consume_all(Functor & f)
471     {
472         return ringbuffer_base<T>::consume_all(f, data(), max_size);
473     }
474 
475     template <typename Functor>
consume_all(Functor const & f)476     size_type consume_all(Functor const & f)
477     {
478         return ringbuffer_base<T>::consume_all(f, data(), max_size);
479     }
480 
push(T const * t,size_type size)481     size_type push(T const * t, size_type size)
482     {
483         return ringbuffer_base<T>::push(t, size, data(), max_size);
484     }
485 
486     template <size_type size>
push(T const (& t)[size])487     size_type push(T const (&t)[size])
488     {
489         return push(t, size);
490     }
491 
492     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end)493     ConstIterator push(ConstIterator begin, ConstIterator end)
494     {
495         return ringbuffer_base<T>::push(begin, end, data(), max_size);
496     }
497 
pop(T * ret,size_type size)498     size_type pop(T * ret, size_type size)
499     {
500         return ringbuffer_base<T>::pop(ret, size, data(), max_size);
501     }
502 
503     template <typename OutputIterator>
pop_to_output_iterator(OutputIterator it)504     size_type pop_to_output_iterator(OutputIterator it)
505     {
506         return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
507     }
508 
front(void) const509     const T& front(void) const
510     {
511         return ringbuffer_base<T>::front(data());
512     }
513 
front(void)514     T& front(void)
515     {
516         return ringbuffer_base<T>::front(data());
517     }
518 };
519 
520 template <typename T, typename Alloc>
521 class runtime_sized_ringbuffer:
522     public ringbuffer_base<T>,
523     private Alloc
524 {
525     typedef std::size_t size_type;
526     size_type max_elements_;
527 #ifdef BOOST_NO_CXX11_ALLOCATOR
528     typedef typename Alloc::pointer pointer;
529 #else
530     typedef std::allocator_traits<Alloc> allocator_traits;
531     typedef typename allocator_traits::pointer pointer;
532 #endif
533     pointer array_;
534 
535 protected:
max_number_of_elements() const536     size_type max_number_of_elements() const
537     {
538         return max_elements_;
539     }
540 
541 public:
runtime_sized_ringbuffer(size_type max_elements)542     explicit runtime_sized_ringbuffer(size_type max_elements):
543         max_elements_(max_elements + 1)
544     {
545 #ifdef BOOST_NO_CXX11_ALLOCATOR
546         array_ = Alloc::allocate(max_elements_);
547 #else
548         Alloc& alloc = *this;
549         array_ = allocator_traits::allocate(alloc, max_elements_);
550 #endif
551     }
552 
553     template <typename U>
runtime_sized_ringbuffer(typename boost::allocator_rebind<Alloc,U>::type const & alloc,size_type max_elements)554     runtime_sized_ringbuffer(typename boost::allocator_rebind<Alloc, U>::type const & alloc, size_type max_elements):
555         Alloc(alloc), max_elements_(max_elements + 1)
556     {
557 #ifdef BOOST_NO_CXX11_ALLOCATOR
558         array_ = Alloc::allocate(max_elements_);
559 #else
560         Alloc& allocator = *this;
561         array_ = allocator_traits::allocate(allocator, max_elements_);
562 #endif
563     }
564 
runtime_sized_ringbuffer(Alloc const & alloc,size_type max_elements)565     runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
566         Alloc(alloc), max_elements_(max_elements + 1)
567     {
568 #ifdef BOOST_NO_CXX11_ALLOCATOR
569         array_ = Alloc::allocate(max_elements_);
570 #else
571         Alloc& allocator = *this;
572         array_ = allocator_traits::allocate(allocator, max_elements_);
573 #endif
574     }
575 
~runtime_sized_ringbuffer(void)576     ~runtime_sized_ringbuffer(void)
577     {
578         // destroy all remaining items
579         T out;
580         while (pop(&out, 1)) {}
581 
582 #ifdef BOOST_NO_CXX11_ALLOCATOR
583         Alloc::deallocate(array_, max_elements_);
584 #else
585         Alloc& allocator = *this;
586         allocator_traits::deallocate(allocator, array_, max_elements_);
587 #endif
588     }
589 
push(T const & t)590     bool push(T const & t)
591     {
592         return ringbuffer_base<T>::push(t, &*array_, max_elements_);
593     }
594 
595     template <typename Functor>
consume_one(Functor & f)596     bool consume_one(Functor & f)
597     {
598         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
599     }
600 
601     template <typename Functor>
consume_one(Functor const & f)602     bool consume_one(Functor const & f)
603     {
604         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
605     }
606 
607     template <typename Functor>
consume_all(Functor & f)608     size_type consume_all(Functor & f)
609     {
610         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
611     }
612 
613     template <typename Functor>
consume_all(Functor const & f)614     size_type consume_all(Functor const & f)
615     {
616         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
617     }
618 
push(T const * t,size_type size)619     size_type push(T const * t, size_type size)
620     {
621         return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
622     }
623 
624     template <size_type size>
push(T const (& t)[size])625     size_type push(T const (&t)[size])
626     {
627         return push(t, size);
628     }
629 
630     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end)631     ConstIterator push(ConstIterator begin, ConstIterator end)
632     {
633         return ringbuffer_base<T>::push(begin, end, &*array_, max_elements_);
634     }
635 
pop(T * ret,size_type size)636     size_type pop(T * ret, size_type size)
637     {
638         return ringbuffer_base<T>::pop(ret, size, &*array_, max_elements_);
639     }
640 
641     template <typename OutputIterator>
pop_to_output_iterator(OutputIterator it)642     size_type pop_to_output_iterator(OutputIterator it)
643     {
644         return ringbuffer_base<T>::pop_to_output_iterator(it, &*array_, max_elements_);
645     }
646 
front(void) const647     const T& front(void) const
648     {
649         return ringbuffer_base<T>::front(&*array_);
650     }
651 
front(void)652     T& front(void)
653     {
654         return ringbuffer_base<T>::front(&*array_);
655     }
656 };
657 
658 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
659 template <typename T, typename A0, typename A1>
660 #else
661 template <typename T, typename ...Options>
662 #endif
663 struct make_ringbuffer
664 {
665 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
666     typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
667 #else
668     typedef typename ringbuffer_signature::bind<Options...>::type bound_args;
669 #endif
670 
671     typedef extract_capacity<bound_args> extract_capacity_t;
672 
673     static const bool runtime_sized = !extract_capacity_t::has_capacity;
674     static const size_t capacity    =  extract_capacity_t::capacity;
675 
676     typedef extract_allocator<bound_args, T> extract_allocator_t;
677     typedef typename extract_allocator_t::type allocator;
678 
679     // allocator argument is only sane, for run-time sized ringbuffers
680     BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
681                                   mpl::bool_<!extract_allocator_t::has_allocator>,
682                                   mpl::true_
683                                  >::type::value));
684 
685     typedef typename mpl::if_c<runtime_sized,
686                                runtime_sized_ringbuffer<T, allocator>,
687                                compile_time_sized_ringbuffer<T, capacity>
688                               >::type ringbuffer_type;
689 };
690 
691 
692 } /* namespace detail */
693 
694 
695 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
696  *
697  *  \b Policies:
698  *  - \c boost::lockfree::capacity<>, optional <br>
699  *    If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
700  *
701  *  - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
702  *    Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
703  *    to be sized at run-time
704  *
705  *  \b Requirements:
706  *  - T must have a default constructor
707  *  - T must be copyable
708  * */
709 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
710 template <typename T, class A0, class A1>
711 #else
712 template <typename T, typename ...Options>
713 #endif
714 class spsc_queue:
715 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
716     public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
717 #else
718     public detail::make_ringbuffer<T, Options...>::ringbuffer_type
719 #endif
720 {
721 private:
722 
723 #ifndef BOOST_DOXYGEN_INVOKED
724 
725 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
726     typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
727     static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
728     typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
729 #else
730     typedef typename detail::make_ringbuffer<T, Options...>::ringbuffer_type base_type;
731     static const bool runtime_sized = detail::make_ringbuffer<T, Options...>::runtime_sized;
732     typedef typename detail::make_ringbuffer<T, Options...>::allocator allocator_arg;
733 #endif
734 
735 
736     struct implementation_defined
737     {
738         typedef allocator_arg allocator;
739         typedef std::size_t size_type;
740     };
741 #endif
742 
743 public:
744     typedef T value_type;
745     typedef typename implementation_defined::allocator allocator;
746     typedef typename implementation_defined::size_type size_type;
747 
748     /** Constructs a spsc_queue
749      *
750      *  \pre spsc_queue must be configured to be sized at compile-time
751      */
spsc_queue(void)752     spsc_queue(void)
753     {
754         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
755         // this function and this function may be compiled even when it isn't being used.
756         BOOST_ASSERT(!runtime_sized);
757     }
758 
759     /** Constructs a spsc_queue with a custom allocator
760      *
761      *  \pre spsc_queue must be configured to be sized at compile-time
762      *
763      *  \note This is just for API compatibility: an allocator isn't actually needed
764      */
765     template <typename U>
spsc_queue(typename boost::allocator_rebind<allocator,U>::type const &)766     explicit spsc_queue(typename boost::allocator_rebind<allocator, U>::type const &)
767     {
768         BOOST_STATIC_ASSERT(!runtime_sized);
769     }
770 
771     /** Constructs a spsc_queue with a custom allocator
772      *
773      *  \pre spsc_queue must be configured to be sized at compile-time
774      *
775      *  \note This is just for API compatibility: an allocator isn't actually needed
776      */
spsc_queue(allocator const &)777     explicit spsc_queue(allocator const &)
778     {
779         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
780         // this function and this function may be compiled even when it isn't being used.
781         BOOST_ASSERT(!runtime_sized);
782     }
783 
784     /** Constructs a spsc_queue for element_count elements
785      *
786      *  \pre spsc_queue must be configured to be sized at run-time
787      */
spsc_queue(size_type element_count)788     explicit spsc_queue(size_type element_count):
789         base_type(element_count)
790     {
791         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
792         // this function and this function may be compiled even when it isn't being used.
793         BOOST_ASSERT(runtime_sized);
794     }
795 
796     /** Constructs a spsc_queue for element_count elements with a custom allocator
797      *
798      *  \pre spsc_queue must be configured to be sized at run-time
799      */
800     template <typename U>
spsc_queue(size_type element_count,typename boost::allocator_rebind<allocator,U>::type const & alloc)801     spsc_queue(size_type element_count, typename boost::allocator_rebind<allocator, U>::type const & alloc):
802         base_type(alloc, element_count)
803     {
804         BOOST_STATIC_ASSERT(runtime_sized);
805     }
806 
807     /** Constructs a spsc_queue for element_count elements with a custom allocator
808      *
809      *  \pre spsc_queue must be configured to be sized at run-time
810      */
spsc_queue(size_type element_count,allocator_arg const & alloc)811     spsc_queue(size_type element_count, allocator_arg const & alloc):
812         base_type(alloc, element_count)
813     {
814         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
815         // this function and this function may be compiled even when it isn't being used.
816         BOOST_ASSERT(runtime_sized);
817     }
818 
819     /** Pushes object t to the ringbuffer.
820      *
821      * \pre only one thread is allowed to push data to the spsc_queue
822      * \post object will be pushed to the spsc_queue, unless it is full.
823      * \return true, if the push operation is successful.
824      *
825      * \note Thread-safe and wait-free
826      * */
push(T const & t)827     bool push(T const & t)
828     {
829         return base_type::push(t);
830     }
831 
832     /** Pops one object from ringbuffer.
833      *
834      * \pre only one thread is allowed to pop data to the spsc_queue
835      * \post if ringbuffer is not empty, object will be discarded.
836      * \return true, if the pop operation is successful, false if ringbuffer was empty.
837      *
838      * \note Thread-safe and wait-free
839      */
pop()840     bool pop ()
841     {
842         detail::consume_noop consume_functor;
843         return consume_one( consume_functor );
844     }
845 
846     /** Pops one object from ringbuffer.
847      *
848      * \pre only one thread is allowed to pop data to the spsc_queue
849      * \post if ringbuffer is not empty, object will be copied to ret.
850      * \return true, if the pop operation is successful, false if ringbuffer was empty.
851      *
852      * \note Thread-safe and wait-free
853      */
854     template <typename U>
855     typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
pop(U & ret)856     pop (U & ret)
857     {
858         detail::consume_via_copy<U> consume_functor(ret);
859         return consume_one( consume_functor );
860     }
861 
862     /** Pushes as many objects from the array t as there is space.
863      *
864      * \pre only one thread is allowed to push data to the spsc_queue
865      * \return number of pushed items
866      *
867      * \note Thread-safe and wait-free
868      */
push(T const * t,size_type size)869     size_type push(T const * t, size_type size)
870     {
871         return base_type::push(t, size);
872     }
873 
874     /** Pushes as many objects from the array t as there is space available.
875      *
876      * \pre only one thread is allowed to push data to the spsc_queue
877      * \return number of pushed items
878      *
879      * \note Thread-safe and wait-free
880      */
881     template <size_type size>
push(T const (& t)[size])882     size_type push(T const (&t)[size])
883     {
884         return push(t, size);
885     }
886 
887     /** Pushes as many objects from the range [begin, end) as there is space .
888      *
889      * \pre only one thread is allowed to push data to the spsc_queue
890      * \return iterator to the first element, which has not been pushed
891      *
892      * \note Thread-safe and wait-free
893      */
894     template <typename ConstIterator>
push(ConstIterator begin,ConstIterator end)895     ConstIterator push(ConstIterator begin, ConstIterator end)
896     {
897         return base_type::push(begin, end);
898     }
899 
900     /** Pops a maximum of size objects from ringbuffer.
901      *
902      * \pre only one thread is allowed to pop data to the spsc_queue
903      * \return number of popped items
904      *
905      * \note Thread-safe and wait-free
906      * */
pop(T * ret,size_type size)907     size_type pop(T * ret, size_type size)
908     {
909         return base_type::pop(ret, size);
910     }
911 
912     /** Pops a maximum of size objects from spsc_queue.
913      *
914      * \pre only one thread is allowed to pop data to the spsc_queue
915      * \return number of popped items
916      *
917      * \note Thread-safe and wait-free
918      * */
919     template <size_type size>
pop(T (& ret)[size])920     size_type pop(T (&ret)[size])
921     {
922         return pop(ret, size);
923     }
924 
925     /** Pops objects to the output iterator it
926      *
927      * \pre only one thread is allowed to pop data to the spsc_queue
928      * \return number of popped items
929      *
930      * \note Thread-safe and wait-free
931      * */
932     template <typename OutputIterator>
933     typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
pop(OutputIterator it)934     pop(OutputIterator it)
935     {
936         return base_type::pop_to_output_iterator(it);
937     }
938 
939     /** consumes one element via a functor
940      *
941      *  pops one element from the queue and applies the functor on this object
942      *
943      * \returns true, if one element was consumed
944      *
945      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
946      * */
947     template <typename Functor>
consume_one(Functor & f)948     bool consume_one(Functor & f)
949     {
950         return base_type::consume_one(f);
951     }
952 
953     /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
954     template <typename Functor>
consume_one(Functor const & f)955     bool consume_one(Functor const & f)
956     {
957         return base_type::consume_one(f);
958     }
959 
960     /** consumes all elements via a functor
961      *
962      * sequentially pops all elements from the queue and applies the functor on each object
963      *
964      * \returns number of elements that are consumed
965      *
966      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
967      * */
968     template <typename Functor>
consume_all(Functor & f)969     size_type consume_all(Functor & f)
970     {
971         return base_type::consume_all(f);
972     }
973 
974     /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
975     template <typename Functor>
consume_all(Functor const & f)976     size_type consume_all(Functor const & f)
977     {
978         return base_type::consume_all(f);
979     }
980 
981     /** get number of elements that are available for read
982      *
983      * \return number of available elements that can be popped from the spsc_queue
984      *
985      * \note Thread-safe and wait-free, should only be called from the consumer thread
986      * */
read_available() const987     size_type read_available() const
988     {
989         return base_type::read_available(base_type::max_number_of_elements());
990     }
991 
992     /** get write space to write elements
993      *
994      * \return number of elements that can be pushed to the spsc_queue
995      *
996      * \note Thread-safe and wait-free, should only be called from the producer thread
997      * */
write_available() const998     size_type write_available() const
999     {
1000         return base_type::write_available(base_type::max_number_of_elements());
1001     }
1002 
1003     /** get reference to element in the front of the queue
1004      *
1005      * Availability of front element can be checked using read_available().
1006      *
1007      * \pre only a consuming thread is allowed to check front element
1008      * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
1009      * \return reference to the first element in the queue
1010      *
1011      * \note Thread-safe and wait-free
1012      */
front() const1013     const T& front() const
1014     {
1015         BOOST_ASSERT(read_available() > 0);
1016         return base_type::front();
1017     }
1018 
1019     /// \copydoc boost::lockfree::spsc_queue::front() const
front()1020     T& front()
1021     {
1022         BOOST_ASSERT(read_available() > 0);
1023         return base_type::front();
1024     }
1025 
1026     /** reset the ringbuffer
1027      *
1028      * \note Not thread-safe
1029      * */
reset(void)1030     void reset(void)
1031     {
1032         if ( !boost::has_trivial_destructor<T>::value ) {
1033             // make sure to call all destructors!
1034 
1035             T dummy_element;
1036             while (pop(dummy_element))
1037             {}
1038         } else {
1039             base_type::write_index_.store(0, memory_order_relaxed);
1040             base_type::read_index_.store(0, memory_order_release);
1041         }
1042    }
1043 };
1044 
1045 } /* namespace lockfree */
1046 } /* namespace boost */
1047 
1048 
1049 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */
1050