1 2 // Copyright Oliver Kowalke 2016. 3 // Distributed under the Boost Software License, Version 1.0. 4 // (See accompanying file LICENSE_1_0.txt or copy at 5 // http://www.boost.org/LICENSE_1_0.txt) 6 // 7 // based on Dmitry Vyukov's MPMC queue 8 // (http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue) 9 10 #ifndef BUFFERED_CHANNEL_H 11 #define BUFFERED_CHANNEL_H 12 13 #include <atomic> 14 #include <chrono> 15 #include <condition_variable> 16 #include <cstddef> 17 #include <cstdint> 18 #include <memory> 19 #include <mutex> 20 #include <stdexcept> 21 #include <type_traits> 22 23 #include <boost/assert.hpp> 24 #include <boost/config.hpp> 25 #include <boost/fiber/detail/config.hpp> 26 27 enum class channel_op_status { 28 success = 0, 29 empty, 30 full, 31 closed, 32 timeout 33 }; 34 35 template< typename T > 36 class buffered_channel { 37 public: 38 typedef T value_type; 39 40 private: 41 typedef typename std::aligned_storage< sizeof( T), alignof( T) >::type storage_type; 42 43 struct alignas(cache_alignment) slot { 44 std::atomic< std::size_t > cycle{ 0 }; 45 storage_type storage{}; 46 47 slot() = default; 48 }; 49 50 // procuder cacheline 51 alignas(cache_alignment) std::atomic< std::size_t > producer_idx_{ 0 }; 52 // consumer cacheline 53 alignas(cache_alignment) std::atomic< std::size_t > consumer_idx_{ 0 }; 54 // shared write cacheline 55 alignas(cache_alignment) std::atomic_bool closed_{ false }; 56 mutable std::mutex mtx_{}; 57 std::condition_variable not_full_cnd_{}; 58 std::condition_variable not_empty_cnd_{}; 59 // shared read cacheline 60 alignas(cache_alignment) slot * slots_{ nullptr }; 61 std::size_t capacity_; 62 char pad_[cacheline_length]; 63 std::size_t waiting_consumer_{ 0 }; 64 is_full_()65 bool is_full_() { 66 std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) }; 67 return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx); 68 } 69 is_empty_()70 bool is_empty_() { 71 std::size_t idx{ consumer_idx_.load( std::memory_order_relaxed) }; 72 return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx + 1); 73 } 74 75 template< typename ValueType > try_push_(ValueType && value)76 channel_op_status try_push_( ValueType && value) { 77 slot * s{ nullptr }; 78 std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) }; 79 for (;;) { 80 s = & slots_[idx & (capacity_ - 1)]; 81 std::size_t cycle{ s->cycle.load( std::memory_order_acquire) }; 82 std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx) }; 83 if ( 0 == diff) { 84 if ( producer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) { 85 break; 86 } 87 } else if ( 0 > diff) { 88 return channel_op_status::full; 89 } else { 90 idx = producer_idx_.load( std::memory_order_relaxed); 91 } 92 } 93 ::new ( static_cast< void * >( std::addressof( s->storage) ) ) value_type( std::forward< ValueType >( value) ); 94 s->cycle.store( idx + 1, std::memory_order_release); 95 return channel_op_status::success; 96 } 97 try_value_pop_(slot * & s,std::size_t & idx)98 channel_op_status try_value_pop_( slot *& s, std::size_t & idx) { 99 idx = consumer_idx_.load( std::memory_order_relaxed); 100 for (;;) { 101 s = & slots_[idx & (capacity_ - 1)]; 102 std::size_t cycle = s->cycle.load( std::memory_order_acquire); 103 std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx + 1) }; 104 if ( 0 == diff) { 105 if ( consumer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) { 106 break; 107 } 108 } else if ( 0 > diff) { 109 return channel_op_status::empty; 110 } else { 111 idx = consumer_idx_.load( std::memory_order_relaxed); 112 } 113 } 114 // incrementing the slot cycle must be deferred till the value has been consumed 115 // slot cycle tells procuders that the cell can be re-used (store new value) 116 return channel_op_status::success; 117 } 118 try_pop_(value_type & value)119 channel_op_status try_pop_( value_type & value) { 120 slot * s{ nullptr }; 121 std::size_t idx{ 0 }; 122 channel_op_status status{ try_value_pop_( s, idx) }; 123 if ( channel_op_status::success == status) { 124 value = std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ); 125 s->cycle.store( idx + capacity_, std::memory_order_release); 126 } 127 return status; 128 } 129 130 public: buffered_channel(std::size_t capacity)131 explicit buffered_channel( std::size_t capacity) : 132 capacity_{ capacity } { 133 if ( 0 == capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) { 134 throw std::runtime_error{ "boost fiber: buffer capacity is invalid" }; 135 } 136 slots_ = new slot[capacity_](); 137 for ( std::size_t i = 0; i < capacity_; ++i) { 138 slots_[i].cycle.store( i, std::memory_order_relaxed); 139 } 140 } 141 ~buffered_channel()142 ~buffered_channel() { 143 close(); 144 for (;;) { 145 slot * s{ nullptr }; 146 std::size_t idx{ 0 }; 147 if ( channel_op_status::success == try_value_pop_( s, idx) ) { 148 reinterpret_cast< value_type * >( std::addressof( s->storage) )->~value_type(); 149 s->cycle.store( idx + capacity_, std::memory_order_release); 150 } else { 151 break; 152 } 153 } 154 delete [] slots_; 155 } 156 157 buffered_channel( buffered_channel const&) = delete; 158 buffered_channel & operator=( buffered_channel const&) = delete; 159 is_closed() const160 bool is_closed() const noexcept { 161 return closed_.load( std::memory_order_acquire); 162 } 163 close()164 void close() noexcept { 165 std::unique_lock< std::mutex > lk{ mtx_ }; 166 closed_.store( true, std::memory_order_release); 167 not_full_cnd_.notify_all(); 168 not_empty_cnd_.notify_all(); 169 } 170 push(value_type const & value)171 channel_op_status push( value_type const& value) { 172 for (;;) { 173 if ( is_closed() ) { 174 return channel_op_status::closed; 175 } 176 channel_op_status status{ try_push_( value) }; 177 if ( channel_op_status::success == status) { 178 std::unique_lock< std::mutex > lk{ mtx_ }; 179 if ( 0 < waiting_consumer_) { 180 not_empty_cnd_.notify_one(); 181 } 182 return status; 183 } else if ( channel_op_status::full == status) { 184 std::unique_lock< std::mutex > lk{ mtx_ }; 185 if ( is_closed() ) { 186 return channel_op_status::closed; 187 } 188 if ( ! is_full_() ) { 189 continue; 190 } 191 not_full_cnd_.wait( lk, [this]{ return is_closed() || ! is_full_(); }); 192 } else { 193 BOOST_ASSERT( channel_op_status::closed == status); 194 return status; 195 } 196 } 197 } 198 value_pop()199 value_type value_pop() { 200 for (;;) { 201 slot * s{ nullptr }; 202 std::size_t idx{ 0 }; 203 channel_op_status status{ try_value_pop_( s, idx) }; 204 if ( channel_op_status::success == status) { 205 value_type value{ std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ) }; 206 s->cycle.store( idx + capacity_, std::memory_order_release); 207 not_full_cnd_.notify_one(); 208 return std::move( value); 209 } else if ( channel_op_status::empty == status) { 210 std::unique_lock< std::mutex > lk{ mtx_ }; 211 ++waiting_consumer_; 212 if ( is_closed() ) { 213 throw std::runtime_error{ "boost fiber: channel is closed" }; 214 } 215 if ( ! is_empty_() ) { 216 continue; 217 } 218 not_empty_cnd_.wait( lk, [this](){ return is_closed() || ! is_empty_(); }); 219 --waiting_consumer_; 220 } else { 221 BOOST_ASSERT( channel_op_status::closed == status); 222 throw std::runtime_error{ "boost fiber: channel is closed" }; 223 } 224 } 225 } 226 }; 227 228 #endif // BUFFERED_CHANNEL_H 229