• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *          Copyright Andrey Semashev 2007 - 2015.
3  * Distributed under the Boost Software License, Version 1.0.
4  *    (See accompanying file LICENSE_1_0.txt or copy at
5  *          http://www.boost.org/LICENSE_1_0.txt)
6  */
7 /*!
8  * \file   threadsafe_queue.cpp
9  * \author Andrey Semashev
10  * \date   05.11.2010
11  *
12  * \brief  This header is the Boost.Log library implementation, see the library documentation
13  *         at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
14  *
15  * The implementation is based on algorithms published in the "Simple, Fast,
16  * and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" article
17  * in PODC96 by Maged M. Michael and Michael L. Scott. Pseudocode is available here:
18  * http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
19  *
20  * The lock-free version of the mentioned algorithms contain a race condition and therefore
21  * were not included here.
22  */
23 
24 #include <boost/log/detail/config.hpp>
25 #include <boost/log/detail/threadsafe_queue.hpp>
26 
27 #ifndef BOOST_LOG_NO_THREADS
28 
29 #include <new>
30 #include <boost/assert.hpp>
31 #include <boost/throw_exception.hpp>
32 #include <boost/align/aligned_alloc.hpp>
33 #include <boost/type_traits/alignment_of.hpp>
34 #include <boost/log/detail/adaptive_mutex.hpp>
35 #include <boost/log/detail/locks.hpp>
36 #include <boost/log/detail/header.hpp>
37 
38 namespace boost {
39 
40 BOOST_LOG_OPEN_NAMESPACE
41 
42 namespace aux {
43 
44 //! Generic queue implementation with two locks
45 class threadsafe_queue_impl_generic :
46     public threadsafe_queue_impl
47 {
48 private:
49     //! Mutex type to be used
50     typedef adaptive_mutex mutex_type;
51 
52     /*!
53      * A structure that contains a pointer to the node and the associated mutex.
54      * The alignment below allows to eliminate false sharing, it should not be less than CPU cache line size.
55      */
BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE)56     struct BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE) pointer
57     {
58         //! Pointer to the either end of the queue
59         node_base* node;
60         //! Lock for access synchronization
61         mutex_type mutex;
62         //  128 bytes padding is chosen to mitigate false sharing for NetBurst CPUs, which load two cache lines in one go.
63         unsigned char padding[128U - (sizeof(node_base*) + sizeof(mutex_type)) % 128U];
64     };
65 
66 private:
67     //! Pointer to the beginning of the queue
68     pointer m_Head;
69     //! Pointer to the end of the queue
70     pointer m_Tail;
71 
72 public:
threadsafe_queue_impl_generic(node_base * first_node)73     explicit threadsafe_queue_impl_generic(node_base* first_node)
74     {
75         set_next(first_node, NULL);
76         m_Head.node = m_Tail.node = first_node;
77     }
78 
~threadsafe_queue_impl_generic()79     ~threadsafe_queue_impl_generic() BOOST_OVERRIDE
80     {
81     }
82 
reset_last_node()83     node_base* reset_last_node() BOOST_OVERRIDE
84     {
85         BOOST_ASSERT(m_Head.node == m_Tail.node);
86         node_base* p = m_Head.node;
87         m_Head.node = m_Tail.node = NULL;
88         return p;
89     }
90 
unsafe_empty()91     bool unsafe_empty() BOOST_OVERRIDE
92     {
93         return m_Head.node == m_Tail.node;
94     }
95 
push(node_base * p)96     void push(node_base* p) BOOST_OVERRIDE
97     {
98         set_next(p, NULL);
99         exclusive_lock_guard< mutex_type > _(m_Tail.mutex);
100         set_next(m_Tail.node, p);
101         m_Tail.node = p;
102     }
103 
try_pop(node_base * & node_to_free,node_base * & node_with_value)104     bool try_pop(node_base*& node_to_free, node_base*& node_with_value) BOOST_OVERRIDE
105     {
106         exclusive_lock_guard< mutex_type > _(m_Head.mutex);
107         node_base* next = get_next(m_Head.node);
108         if (next)
109         {
110             // We have a node to pop
111             node_to_free = m_Head.node;
112             node_with_value = m_Head.node = next;
113             return true;
114         }
115         else
116             return false;
117     }
118 
119 private:
set_next(node_base * p,node_base * next)120     BOOST_FORCEINLINE static void set_next(node_base* p, node_base* next)
121     {
122         p->next.data[0] = next;
123     }
get_next(node_base * p)124     BOOST_FORCEINLINE static node_base* get_next(node_base* p)
125     {
126         return static_cast< node_base* >(p->next.data[0]);
127     }
128 
129     // Copying and assignment are closed
130     BOOST_DELETED_FUNCTION(threadsafe_queue_impl_generic(threadsafe_queue_impl_generic const&))
131     BOOST_DELETED_FUNCTION(threadsafe_queue_impl_generic& operator= (threadsafe_queue_impl_generic const&))
132 };
133 
create(node_base * first_node)134 BOOST_LOG_API threadsafe_queue_impl* threadsafe_queue_impl::create(node_base* first_node)
135 {
136     return new threadsafe_queue_impl_generic(first_node);
137 }
138 
operator new(std::size_t size)139 BOOST_LOG_API void* threadsafe_queue_impl::operator new (std::size_t size)
140 {
141     void* p = alignment::aligned_alloc(BOOST_LOG_CPU_CACHE_LINE_SIZE, size);
142     if (BOOST_UNLIKELY(!p))
143         BOOST_THROW_EXCEPTION(std::bad_alloc());
144     return p;
145 }
146 
operator delete(void * p,std::size_t)147 BOOST_LOG_API void threadsafe_queue_impl::operator delete (void* p, std::size_t)
148 {
149     alignment::aligned_free(p);
150 }
151 
152 } // namespace aux
153 
154 BOOST_LOG_CLOSE_NAMESPACE // namespace log
155 
156 } // namespace boost
157 
158 #include <boost/log/detail/footer.hpp>
159 
160 #endif // BOOST_LOG_NO_THREADS
161