1 /*
2 * Copyright Andrey Semashev 2007 - 2015.
3 * Distributed under the Boost Software License, Version 1.0.
4 * (See accompanying file LICENSE_1_0.txt or copy at
5 * http://www.boost.org/LICENSE_1_0.txt)
6 */
7 /*!
8 * \file threadsafe_queue.cpp
9 * \author Andrey Semashev
10 * \date 05.11.2010
11 *
12 * \brief This header is the Boost.Log library implementation, see the library documentation
13 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
14 *
15 * The implementation is based on algorithms published in the "Simple, Fast,
16 * and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" article
17 * in PODC96 by Maged M. Michael and Michael L. Scott. Pseudocode is available here:
18 * http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
19 *
20 * The lock-free version of the mentioned algorithms contain a race condition and therefore
21 * were not included here.
22 */
23
24 #include <boost/log/detail/config.hpp>
25 #include <boost/log/detail/threadsafe_queue.hpp>
26
27 #ifndef BOOST_LOG_NO_THREADS
28
29 #include <new>
30 #include <boost/assert.hpp>
31 #include <boost/throw_exception.hpp>
32 #include <boost/align/aligned_alloc.hpp>
33 #include <boost/type_traits/alignment_of.hpp>
34 #include <boost/log/detail/adaptive_mutex.hpp>
35 #include <boost/log/detail/locks.hpp>
36 #include <boost/log/detail/header.hpp>
37
38 namespace boost {
39
40 BOOST_LOG_OPEN_NAMESPACE
41
42 namespace aux {
43
44 //! Generic queue implementation with two locks
45 class threadsafe_queue_impl_generic :
46 public threadsafe_queue_impl
47 {
48 private:
49 //! Mutex type to be used
50 typedef adaptive_mutex mutex_type;
51
52 /*!
53 * A structure that contains a pointer to the node and the associated mutex.
54 * The alignment below allows to eliminate false sharing, it should not be less than CPU cache line size.
55 */
BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE)56 struct BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE) pointer
57 {
58 //! Pointer to the either end of the queue
59 node_base* node;
60 //! Lock for access synchronization
61 mutex_type mutex;
62 // 128 bytes padding is chosen to mitigate false sharing for NetBurst CPUs, which load two cache lines in one go.
63 unsigned char padding[128U - (sizeof(node_base*) + sizeof(mutex_type)) % 128U];
64 };
65
66 private:
67 //! Pointer to the beginning of the queue
68 pointer m_Head;
69 //! Pointer to the end of the queue
70 pointer m_Tail;
71
72 public:
threadsafe_queue_impl_generic(node_base * first_node)73 explicit threadsafe_queue_impl_generic(node_base* first_node)
74 {
75 set_next(first_node, NULL);
76 m_Head.node = m_Tail.node = first_node;
77 }
78
~threadsafe_queue_impl_generic()79 ~threadsafe_queue_impl_generic() BOOST_OVERRIDE
80 {
81 }
82
reset_last_node()83 node_base* reset_last_node() BOOST_OVERRIDE
84 {
85 BOOST_ASSERT(m_Head.node == m_Tail.node);
86 node_base* p = m_Head.node;
87 m_Head.node = m_Tail.node = NULL;
88 return p;
89 }
90
unsafe_empty()91 bool unsafe_empty() BOOST_OVERRIDE
92 {
93 return m_Head.node == m_Tail.node;
94 }
95
push(node_base * p)96 void push(node_base* p) BOOST_OVERRIDE
97 {
98 set_next(p, NULL);
99 exclusive_lock_guard< mutex_type > _(m_Tail.mutex);
100 set_next(m_Tail.node, p);
101 m_Tail.node = p;
102 }
103
try_pop(node_base * & node_to_free,node_base * & node_with_value)104 bool try_pop(node_base*& node_to_free, node_base*& node_with_value) BOOST_OVERRIDE
105 {
106 exclusive_lock_guard< mutex_type > _(m_Head.mutex);
107 node_base* next = get_next(m_Head.node);
108 if (next)
109 {
110 // We have a node to pop
111 node_to_free = m_Head.node;
112 node_with_value = m_Head.node = next;
113 return true;
114 }
115 else
116 return false;
117 }
118
119 private:
set_next(node_base * p,node_base * next)120 BOOST_FORCEINLINE static void set_next(node_base* p, node_base* next)
121 {
122 p->next.data[0] = next;
123 }
get_next(node_base * p)124 BOOST_FORCEINLINE static node_base* get_next(node_base* p)
125 {
126 return static_cast< node_base* >(p->next.data[0]);
127 }
128
129 // Copying and assignment are closed
130 BOOST_DELETED_FUNCTION(threadsafe_queue_impl_generic(threadsafe_queue_impl_generic const&))
131 BOOST_DELETED_FUNCTION(threadsafe_queue_impl_generic& operator= (threadsafe_queue_impl_generic const&))
132 };
133
create(node_base * first_node)134 BOOST_LOG_API threadsafe_queue_impl* threadsafe_queue_impl::create(node_base* first_node)
135 {
136 return new threadsafe_queue_impl_generic(first_node);
137 }
138
operator new(std::size_t size)139 BOOST_LOG_API void* threadsafe_queue_impl::operator new (std::size_t size)
140 {
141 void* p = alignment::aligned_alloc(BOOST_LOG_CPU_CACHE_LINE_SIZE, size);
142 if (BOOST_UNLIKELY(!p))
143 BOOST_THROW_EXCEPTION(std::bad_alloc());
144 return p;
145 }
146
operator delete(void * p,std::size_t)147 BOOST_LOG_API void threadsafe_queue_impl::operator delete (void* p, std::size_t)
148 {
149 alignment::aligned_free(p);
150 }
151
152 } // namespace aux
153
154 BOOST_LOG_CLOSE_NAMESPACE // namespace log
155
156 } // namespace boost
157
158 #include <boost/log/detail/footer.hpp>
159
160 #endif // BOOST_LOG_NO_THREADS
161