• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //  lock-free queue from
2 //  Michael, M. M. and Scott, M. L.,
3 //  "simple, fast and practical non-blocking and blocking concurrent queue algorithms"
4 //
5 //  Copyright (C) 2008-2013 Tim Blechmann
6 //
7 //  Distributed under the Boost Software License, Version 1.0. (See
8 //  accompanying file LICENSE_1_0.txt or copy at
9 //  http://www.boost.org/LICENSE_1_0.txt)
10 
11 #ifndef BOOST_LOCKFREE_FIFO_HPP_INCLUDED
12 #define BOOST_LOCKFREE_FIFO_HPP_INCLUDED
13 
14 #include <boost/assert.hpp>
15 #include <boost/static_assert.hpp>
16 #include <boost/core/allocator_access.hpp>
17 #include <boost/type_traits/has_trivial_assign.hpp>
18 #include <boost/type_traits/has_trivial_destructor.hpp>
19 #include <boost/config.hpp> // for BOOST_LIKELY & BOOST_ALIGNMENT
20 
21 #include <boost/lockfree/detail/atomic.hpp>
22 #include <boost/lockfree/detail/copy_payload.hpp>
23 #include <boost/lockfree/detail/freelist.hpp>
24 #include <boost/lockfree/detail/parameter.hpp>
25 #include <boost/lockfree/detail/tagged_ptr.hpp>
26 
27 #include <boost/lockfree/lockfree_forward.hpp>
28 
29 #ifdef BOOST_HAS_PRAGMA_ONCE
30 #pragma once
31 #endif
32 
33 
34 #if defined(_MSC_VER)
35 #pragma warning(push)
36 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
37 #endif
38 
39 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000)
40 #pragma warning(push)
41 #pragma warning(disable:488) // template parameter unused in declaring parameter types,
42                              // gets erronously triggered the queue constructor which
43                              // takes an allocator of another type and rebinds it
44 #endif
45 
46 
47 
48 namespace boost    {
49 namespace lockfree {
50 namespace detail   {
51 
52 typedef parameter::parameters<boost::parameter::optional<tag::allocator>,
53                               boost::parameter::optional<tag::capacity>
54                              > queue_signature;
55 
56 } /* namespace detail */
57 
58 
59 /** The queue class provides a multi-writer/multi-reader queue, pushing and popping is lock-free,
60  *  construction/destruction has to be synchronized. It uses a freelist for memory management,
61  *  freed nodes are pushed to the freelist and not returned to the OS before the queue is destroyed.
62  *
63  *  \b Policies:
64  *  - \ref boost::lockfree::fixed_sized, defaults to \c boost::lockfree::fixed_sized<false> \n
65  *    Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. \n
66  *    If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed
67  *    by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index
68  *    type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way
69  *    to achieve lock-freedom.
70  *
71  *  - \ref boost::lockfree::capacity, optional \n
72  *    If this template argument is passed to the options, the size of the queue is set at compile-time.\n
73  *    This option implies \c fixed_sized<true>
74  *
75  *  - \ref boost::lockfree::allocator, defaults to \c boost::lockfree::allocator<std::allocator<void>> \n
76  *    Specifies the allocator that is used for the internal freelist
77  *
78  *  \b Requirements:
79  *   - T must have a copy constructor
80  *   - T must have a trivial assignment operator
81  *   - T must have a trivial destructor
82  *
83  * */
84 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
85 template <typename T, class A0, class A1, class A2>
86 #else
87 template <typename T, typename ...Options>
88 #endif
89 class queue
90 {
91 private:
92 #ifndef BOOST_DOXYGEN_INVOKED
93 
94 #ifdef BOOST_HAS_TRIVIAL_DESTRUCTOR
95     BOOST_STATIC_ASSERT((boost::has_trivial_destructor<T>::value));
96 #endif
97 
98 #ifdef BOOST_HAS_TRIVIAL_ASSIGN
99     BOOST_STATIC_ASSERT((boost::has_trivial_assign<T>::value));
100 #endif
101 
102 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
103     typedef typename detail::queue_signature::bind<A0, A1, A2>::type bound_args;
104 #else
105     typedef typename detail::queue_signature::bind<Options...>::type bound_args;
106 #endif
107 
108     static const bool has_capacity = detail::extract_capacity<bound_args>::has_capacity;
109     static const size_t capacity = detail::extract_capacity<bound_args>::capacity + 1; // the queue uses one dummy node
110     static const bool fixed_sized = detail::extract_fixed_sized<bound_args>::value;
111     static const bool node_based = !(has_capacity || fixed_sized);
112     static const bool compile_time_sized = has_capacity;
113 
BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES)114     struct BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES) node
115     {
116         typedef typename detail::select_tagged_handle<node, node_based>::tagged_handle_type tagged_node_handle;
117         typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
118 
119         node(T const & v, handle_type null_handle):
120             data(v)
121         {
122             /* increment tag to avoid ABA problem */
123             tagged_node_handle old_next = next.load(memory_order_relaxed);
124             tagged_node_handle new_next (null_handle, old_next.get_next_tag());
125             next.store(new_next, memory_order_release);
126         }
127 
128         node (handle_type null_handle):
129             next(tagged_node_handle(null_handle, 0))
130         {}
131 
132         node(void)
133         {}
134 
135         atomic<tagged_node_handle> next;
136         T data;
137     };
138 
139     typedef typename detail::extract_allocator<bound_args, node>::type node_allocator;
140     typedef typename detail::select_freelist<node, node_allocator, compile_time_sized, fixed_sized, capacity>::type pool_t;
141     typedef typename pool_t::tagged_node_handle tagged_node_handle;
142     typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
143 
initialize(void)144     void initialize(void)
145     {
146         node * n = pool.template construct<true, false>(pool.null_handle());
147         tagged_node_handle dummy_node(pool.get_handle(n), 0);
148         head_.store(dummy_node, memory_order_relaxed);
149         tail_.store(dummy_node, memory_order_release);
150     }
151 
152     struct implementation_defined
153     {
154         typedef node_allocator allocator;
155         typedef std::size_t size_type;
156     };
157 
158 #endif
159 
160     BOOST_DELETED_FUNCTION(queue(queue const&))
161     BOOST_DELETED_FUNCTION(queue& operator= (queue const&))
162 
163 public:
164     typedef T value_type;
165     typedef typename implementation_defined::allocator allocator;
166     typedef typename implementation_defined::size_type size_type;
167 
168     /**
169      * \return true, if implementation is lock-free.
170      *
171      * \warning It only checks, if the queue head and tail nodes and the freelist can be modified in a lock-free manner.
172      *       On most platforms, the whole implementation is lock-free, if this is true. Using c++0x-style atomics, there is
173      *       no possibility to provide a completely accurate implementation, because one would need to test every internal
174      *       node, which is impossible if further nodes will be allocated from the operating system.
175      * */
is_lock_free(void) const176     bool is_lock_free (void) const
177     {
178         return head_.is_lock_free() && tail_.is_lock_free() && pool.is_lock_free();
179     }
180 
181     /** Construct a fixed-sized queue
182      *
183      *  \pre Must specify a capacity<> argument
184      * */
queue(void)185     queue(void):
186         head_(tagged_node_handle(0, 0)),
187         tail_(tagged_node_handle(0, 0)),
188         pool(node_allocator(), capacity)
189     {
190         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
191         // this function and this function may be compiled even when it isn't being used.
192         BOOST_ASSERT(has_capacity);
193         initialize();
194     }
195 
196     /** Construct a fixed-sized queue with a custom allocator
197      *
198      *  \pre Must specify a capacity<> argument
199      * */
200     template <typename U>
queue(typename boost::allocator_rebind<node_allocator,U>::type const & alloc)201     explicit queue(typename boost::allocator_rebind<node_allocator, U>::type const & alloc):
202         head_(tagged_node_handle(0, 0)),
203         tail_(tagged_node_handle(0, 0)),
204         pool(alloc, capacity)
205     {
206         BOOST_STATIC_ASSERT(has_capacity);
207         initialize();
208     }
209 
210     /** Construct a fixed-sized queue with a custom allocator
211      *
212      *  \pre Must specify a capacity<> argument
213      * */
queue(allocator const & alloc)214     explicit queue(allocator const & alloc):
215         head_(tagged_node_handle(0, 0)),
216         tail_(tagged_node_handle(0, 0)),
217         pool(alloc, capacity)
218     {
219         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
220         // this function and this function may be compiled even when it isn't being used.
221         BOOST_ASSERT(has_capacity);
222         initialize();
223     }
224 
225     /** Construct a variable-sized queue
226      *
227      *  Allocate n nodes initially for the freelist
228      *
229      *  \pre Must \b not specify a capacity<> argument
230      * */
queue(size_type n)231     explicit queue(size_type n):
232         head_(tagged_node_handle(0, 0)),
233         tail_(tagged_node_handle(0, 0)),
234         pool(node_allocator(), n + 1)
235     {
236         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
237         // this function and this function may be compiled even when it isn't being used.
238         BOOST_ASSERT(!has_capacity);
239         initialize();
240     }
241 
242     /** Construct a variable-sized queue with a custom allocator
243      *
244      *  Allocate n nodes initially for the freelist
245      *
246      *  \pre Must \b not specify a capacity<> argument
247      * */
248     template <typename U>
queue(size_type n,typename boost::allocator_rebind<node_allocator,U>::type const & alloc)249     queue(size_type n, typename boost::allocator_rebind<node_allocator, U>::type const & alloc):
250         head_(tagged_node_handle(0, 0)),
251         tail_(tagged_node_handle(0, 0)),
252         pool(alloc, n + 1)
253     {
254         BOOST_STATIC_ASSERT(!has_capacity);
255         initialize();
256     }
257 
258     /** \copydoc boost::lockfree::stack::reserve
259      * */
reserve(size_type n)260     void reserve(size_type n)
261     {
262         pool.template reserve<true>(n);
263     }
264 
265     /** \copydoc boost::lockfree::stack::reserve_unsafe
266      * */
reserve_unsafe(size_type n)267     void reserve_unsafe(size_type n)
268     {
269         pool.template reserve<false>(n);
270     }
271 
272     /** Destroys queue, free all nodes from freelist.
273      * */
~queue(void)274     ~queue(void)
275     {
276         T dummy;
277         while(unsynchronized_pop(dummy))
278         {}
279 
280         pool.template destruct<false>(head_.load(memory_order_relaxed));
281     }
282 
283     /** Check if the queue is empty
284      *
285      * \return true, if the queue is empty, false otherwise
286      * \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this
287      *       value in program logic.
288      * */
empty(void) const289     bool empty(void) const
290     {
291         return pool.get_handle(head_.load()) == pool.get_handle(tail_.load());
292     }
293 
294     /** Pushes object t to the queue.
295      *
296      * \post object will be pushed to the queue, if internal node can be allocated
297      * \returns true, if the push operation is successful.
298      *
299      * \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
300      *                    from the OS. This may not be lock-free.
301      * */
push(T const & t)302     bool push(T const & t)
303     {
304         return do_push<false>(t);
305     }
306 
307     /** Pushes object t to the queue.
308      *
309      * \post object will be pushed to the queue, if internal node can be allocated
310      * \returns true, if the push operation is successful.
311      *
312      * \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail
313      * \throws if memory allocator throws
314      * */
bounded_push(T const & t)315     bool bounded_push(T const & t)
316     {
317         return do_push<true>(t);
318     }
319 
320 
321 private:
322 #ifndef BOOST_DOXYGEN_INVOKED
323     template <bool Bounded>
do_push(T const & t)324     bool do_push(T const & t)
325     {
326         node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
327         handle_type node_handle = pool.get_handle(n);
328 
329         if (n == NULL)
330             return false;
331 
332         for (;;) {
333             tagged_node_handle tail = tail_.load(memory_order_acquire);
334             node * tail_node = pool.get_pointer(tail);
335             tagged_node_handle next = tail_node->next.load(memory_order_acquire);
336             node * next_ptr = pool.get_pointer(next);
337 
338             tagged_node_handle tail2 = tail_.load(memory_order_acquire);
339             if (BOOST_LIKELY(tail == tail2)) {
340                 if (next_ptr == 0) {
341                     tagged_node_handle new_tail_next(node_handle, next.get_next_tag());
342                     if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) {
343                         tagged_node_handle new_tail(node_handle, tail.get_next_tag());
344                         tail_.compare_exchange_strong(tail, new_tail);
345                         return true;
346                     }
347                 }
348                 else {
349                     tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
350                     tail_.compare_exchange_strong(tail, new_tail);
351                 }
352             }
353         }
354     }
355 #endif
356 
357 public:
358 
359     /** Pushes object t to the queue.
360      *
361      * \post object will be pushed to the queue, if internal node can be allocated
362      * \returns true, if the push operation is successful.
363      *
364      * \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
365      *       from the OS. This may not be lock-free.
366      * \throws if memory allocator throws
367      * */
unsynchronized_push(T const & t)368     bool unsynchronized_push(T const & t)
369     {
370         node * n = pool.template construct<false, false>(t, pool.null_handle());
371 
372         if (n == NULL)
373             return false;
374 
375         for (;;) {
376             tagged_node_handle tail = tail_.load(memory_order_relaxed);
377             tagged_node_handle next = tail->next.load(memory_order_relaxed);
378             node * next_ptr = next.get_ptr();
379 
380             if (next_ptr == 0) {
381                 tail->next.store(tagged_node_handle(n, next.get_next_tag()), memory_order_relaxed);
382                 tail_.store(tagged_node_handle(n, tail.get_next_tag()), memory_order_relaxed);
383                 return true;
384             }
385             else
386                 tail_.store(tagged_node_handle(next_ptr, tail.get_next_tag()), memory_order_relaxed);
387         }
388     }
389 
390     /** Pops object from queue.
391      *
392      * \post if pop operation is successful, object will be copied to ret.
393      * \returns true, if the pop operation is successful, false if queue was empty.
394      *
395      * \note Thread-safe and non-blocking
396      * */
pop(T & ret)397     bool pop (T & ret)
398     {
399         return pop<T>(ret);
400     }
401 
402     /** Pops object from queue.
403      *
404      * \pre type U must be constructible by T and copyable, or T must be convertible to U
405      * \post if pop operation is successful, object will be copied to ret.
406      * \returns true, if the pop operation is successful, false if queue was empty.
407      *
408      * \note Thread-safe and non-blocking
409      * */
410     template <typename U>
pop(U & ret)411     bool pop (U & ret)
412     {
413         for (;;) {
414             tagged_node_handle head = head_.load(memory_order_acquire);
415             node * head_ptr = pool.get_pointer(head);
416 
417             tagged_node_handle tail = tail_.load(memory_order_acquire);
418             tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
419             node * next_ptr = pool.get_pointer(next);
420 
421             tagged_node_handle head2 = head_.load(memory_order_acquire);
422             if (BOOST_LIKELY(head == head2)) {
423                 if (pool.get_handle(head) == pool.get_handle(tail)) {
424                     if (next_ptr == 0)
425                         return false;
426 
427                     tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
428                     tail_.compare_exchange_strong(tail, new_tail);
429 
430                 } else {
431                     if (next_ptr == 0)
432                         /* this check is not part of the original algorithm as published by michael and scott
433                          *
434                          * however we reuse the tagged_ptr part for the freelist and clear the next part during node
435                          * allocation. we can observe a null-pointer here.
436                          * */
437                         continue;
438                     detail::copy_payload(next_ptr->data, ret);
439 
440                     tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
441                     if (head_.compare_exchange_weak(head, new_head)) {
442                         pool.template destruct<true>(head);
443                         return true;
444                     }
445                 }
446             }
447         }
448     }
449 
450     /** Pops object from queue.
451      *
452      * \post if pop operation is successful, object will be copied to ret.
453      * \returns true, if the pop operation is successful, false if queue was empty.
454      *
455      * \note Not thread-safe, but non-blocking
456      *
457      * */
unsynchronized_pop(T & ret)458     bool unsynchronized_pop (T & ret)
459     {
460         return unsynchronized_pop<T>(ret);
461     }
462 
463     /** Pops object from queue.
464      *
465      * \pre type U must be constructible by T and copyable, or T must be convertible to U
466      * \post if pop operation is successful, object will be copied to ret.
467      * \returns true, if the pop operation is successful, false if queue was empty.
468      *
469      * \note Not thread-safe, but non-blocking
470      *
471      * */
472     template <typename U>
unsynchronized_pop(U & ret)473     bool unsynchronized_pop (U & ret)
474     {
475         for (;;) {
476             tagged_node_handle head = head_.load(memory_order_relaxed);
477             node * head_ptr = pool.get_pointer(head);
478             tagged_node_handle tail = tail_.load(memory_order_relaxed);
479             tagged_node_handle next = head_ptr->next.load(memory_order_relaxed);
480             node * next_ptr = pool.get_pointer(next);
481 
482             if (pool.get_handle(head) == pool.get_handle(tail)) {
483                 if (next_ptr == 0)
484                     return false;
485 
486                 tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
487                 tail_.store(new_tail);
488             } else {
489                 if (next_ptr == 0)
490                     /* this check is not part of the original algorithm as published by michael and scott
491                      *
492                      * however we reuse the tagged_ptr part for the freelist and clear the next part during node
493                      * allocation. we can observe a null-pointer here.
494                      * */
495                     continue;
496                 detail::copy_payload(next_ptr->data, ret);
497                 tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
498                 head_.store(new_head);
499                 pool.template destruct<false>(head);
500                 return true;
501             }
502         }
503     }
504 
505     /** consumes one element via a functor
506      *
507      *  pops one element from the queue and applies the functor on this object
508      *
509      * \returns true, if one element was consumed
510      *
511      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
512      * */
513     template <typename Functor>
consume_one(Functor & f)514     bool consume_one(Functor & f)
515     {
516         T element;
517         bool success = pop(element);
518         if (success)
519             f(element);
520 
521         return success;
522     }
523 
524     /// \copydoc boost::lockfree::queue::consume_one(Functor & rhs)
525     template <typename Functor>
consume_one(Functor const & f)526     bool consume_one(Functor const & f)
527     {
528         T element;
529         bool success = pop(element);
530         if (success)
531             f(element);
532 
533         return success;
534     }
535 
536     /** consumes all elements via a functor
537      *
538      * sequentially pops all elements from the queue and applies the functor on each object
539      *
540      * \returns number of elements that are consumed
541      *
542      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
543      * */
544     template <typename Functor>
consume_all(Functor & f)545     size_t consume_all(Functor & f)
546     {
547         size_t element_count = 0;
548         while (consume_one(f))
549             element_count += 1;
550 
551         return element_count;
552     }
553 
554     /// \copydoc boost::lockfree::queue::consume_all(Functor & rhs)
555     template <typename Functor>
consume_all(Functor const & f)556     size_t consume_all(Functor const & f)
557     {
558         size_t element_count = 0;
559         while (consume_one(f))
560             element_count += 1;
561 
562         return element_count;
563     }
564 
565 private:
566 #ifndef BOOST_DOXYGEN_INVOKED
567     atomic<tagged_node_handle> head_;
568     static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
569     char padding1[padding_size];
570     atomic<tagged_node_handle> tail_;
571     char padding2[padding_size];
572 
573     pool_t pool;
574 #endif
575 };
576 
577 } /* namespace lockfree */
578 } /* namespace boost */
579 
580 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000)
581 #pragma warning(pop)
582 #endif
583 
584 #if defined(_MSC_VER)
585 #pragma warning(pop)
586 #endif
587 
588 #endif /* BOOST_LOCKFREE_FIFO_HPP_INCLUDED */
589