• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 //          Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // based on Dmitry Vyukov's MPMC queue
8 // (http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue)
9 
10 #ifndef BUFFERED_CHANNEL_H
11 #define BUFFERED_CHANNEL_H
12 
13 #include <atomic>
14 #include <chrono>
15 #include <condition_variable>
16 #include <cstddef>
17 #include <cstdint>
18 #include <memory>
19 #include <mutex>
20 #include <stdexcept>
21 #include <type_traits>
22 
23 #include <boost/assert.hpp>
24 #include <boost/config.hpp>
25 #include <boost/fiber/detail/config.hpp>
26 
27 enum class channel_op_status {
28     success = 0,
29     empty,
30     full,
31     closed,
32     timeout
33 };
34 
35 template< typename T >
36 class buffered_channel {
37 public:
38     typedef T   value_type;
39 
40 private:
41     typedef typename std::aligned_storage< sizeof( T), alignof( T) >::type  storage_type;
42 
43     struct alignas(cache_alignment) slot {
44         std::atomic< std::size_t >  cycle{ 0 };
45         storage_type                storage{};
46 
47         slot() = default;
48     };
49 
50     // procuder cacheline
51     alignas(cache_alignment) std::atomic< std::size_t >     producer_idx_{ 0 };
52     // consumer cacheline
53     alignas(cache_alignment) std::atomic< std::size_t >     consumer_idx_{ 0 };
54     // shared write cacheline
55     alignas(cache_alignment) std::atomic_bool               closed_{ false };
56     mutable std::mutex                                      mtx_{};
57     std::condition_variable                                 not_full_cnd_{};
58     std::condition_variable                                 not_empty_cnd_{};
59     // shared read cacheline
60     alignas(cache_alignment) slot                        *  slots_{ nullptr };
61     std::size_t                                             capacity_;
62     char                                                    pad_[cacheline_length];
63     std::size_t                                             waiting_consumer_{ 0 };
64 
is_full_()65     bool is_full_() {
66         std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) };
67         return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx);
68     }
69 
is_empty_()70     bool is_empty_() {
71         std::size_t idx{ consumer_idx_.load( std::memory_order_relaxed) };
72         return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx + 1);
73     }
74 
75     template< typename ValueType >
try_push_(ValueType && value)76     channel_op_status try_push_( ValueType && value) {
77         slot * s{ nullptr };
78         std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) };
79         for (;;) {
80             s = & slots_[idx & (capacity_ - 1)];
81             std::size_t cycle{ s->cycle.load( std::memory_order_acquire) };
82             std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx) };
83             if ( 0 == diff) {
84                 if ( producer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) {
85                     break;
86                 }
87             } else if ( 0 > diff) {
88                 return channel_op_status::full;
89             } else {
90                 idx = producer_idx_.load( std::memory_order_relaxed);
91             }
92         }
93         ::new ( static_cast< void * >( std::addressof( s->storage) ) ) value_type( std::forward< ValueType >( value) );
94         s->cycle.store( idx + 1, std::memory_order_release);
95         return channel_op_status::success;
96     }
97 
try_value_pop_(slot * & s,std::size_t & idx)98     channel_op_status try_value_pop_( slot *& s, std::size_t & idx) {
99         idx = consumer_idx_.load( std::memory_order_relaxed);
100         for (;;) {
101             s = & slots_[idx & (capacity_ - 1)];
102             std::size_t cycle = s->cycle.load( std::memory_order_acquire);
103             std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx + 1) };
104             if ( 0 == diff) {
105                 if ( consumer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) {
106                     break;
107                 }
108             } else if ( 0 > diff) {
109                 return channel_op_status::empty;
110             } else {
111                 idx = consumer_idx_.load( std::memory_order_relaxed);
112             }
113         }
114         // incrementing the slot cycle must be deferred till the value has been consumed
115         // slot cycle tells procuders that the cell can be re-used (store new value)
116         return channel_op_status::success;
117     }
118 
try_pop_(value_type & value)119     channel_op_status try_pop_( value_type & value) {
120         slot * s{ nullptr };
121         std::size_t idx{ 0 };
122         channel_op_status status{ try_value_pop_( s, idx) };
123         if ( channel_op_status::success == status) {
124             value = std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) );
125             s->cycle.store( idx + capacity_, std::memory_order_release);
126         }
127         return status;
128     }
129 
130 public:
buffered_channel(std::size_t capacity)131     explicit buffered_channel( std::size_t capacity) :
132         capacity_{ capacity } {
133         if ( 0 == capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) {
134             throw std::runtime_error{ "boost fiber: buffer capacity is invalid" };
135         }
136         slots_ = new slot[capacity_]();
137         for ( std::size_t i = 0; i < capacity_; ++i) {
138             slots_[i].cycle.store( i, std::memory_order_relaxed);
139         }
140     }
141 
~buffered_channel()142     ~buffered_channel() {
143         close();
144         for (;;) {
145             slot * s{ nullptr };
146             std::size_t idx{ 0 };
147             if ( channel_op_status::success == try_value_pop_( s, idx) ) {
148                 reinterpret_cast< value_type * >( std::addressof( s->storage) )->~value_type();
149                 s->cycle.store( idx + capacity_, std::memory_order_release);
150             } else {
151                 break;
152             }
153         }
154         delete [] slots_;
155     }
156 
157     buffered_channel( buffered_channel const&) = delete;
158     buffered_channel & operator=( buffered_channel const&) = delete;
159 
is_closed() const160     bool is_closed() const noexcept {
161         return closed_.load( std::memory_order_acquire);
162     }
163 
close()164     void close() noexcept {
165         std::unique_lock< std::mutex > lk{ mtx_ };
166         closed_.store( true, std::memory_order_release);
167         not_full_cnd_.notify_all();
168         not_empty_cnd_.notify_all();
169     }
170 
push(value_type const & value)171     channel_op_status push( value_type const& value) {
172         for (;;) {
173             if ( is_closed() ) {
174                 return channel_op_status::closed;
175             }
176             channel_op_status status{ try_push_( value) };
177             if ( channel_op_status::success == status) {
178                 std::unique_lock< std::mutex > lk{ mtx_ };
179                 if ( 0 < waiting_consumer_) {
180                     not_empty_cnd_.notify_one();
181                 }
182                 return status;
183             } else if ( channel_op_status::full == status) {
184                 std::unique_lock< std::mutex > lk{ mtx_ };
185                 if ( is_closed() ) {
186                     return channel_op_status::closed;
187                 }
188                 if ( ! is_full_() ) {
189                     continue;
190                 }
191                 not_full_cnd_.wait( lk, [this]{ return is_closed() || ! is_full_(); });
192             } else {
193                 BOOST_ASSERT( channel_op_status::closed == status);
194                 return status;
195             }
196         }
197     }
198 
value_pop()199     value_type value_pop() {
200         for (;;) {
201             slot * s{ nullptr };
202             std::size_t idx{ 0 };
203             channel_op_status status{ try_value_pop_( s, idx) };
204             if ( channel_op_status::success == status) {
205                 value_type value{ std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ) };
206                 s->cycle.store( idx + capacity_, std::memory_order_release);
207                 not_full_cnd_.notify_one();
208                 return std::move( value);
209             } else if ( channel_op_status::empty == status) {
210                 std::unique_lock< std::mutex > lk{ mtx_ };
211                 ++waiting_consumer_;
212                 if ( is_closed() ) {
213                     throw std::runtime_error{ "boost fiber: channel is closed" };
214                 }
215                 if ( ! is_empty_() ) {
216                     continue;
217                 }
218                 not_empty_cnd_.wait( lk, [this](){ return is_closed() || ! is_empty_(); });
219                 --waiting_consumer_;
220             } else {
221                 BOOST_ASSERT( channel_op_status::closed == status);
222                 throw std::runtime_error{ "boost fiber: channel is closed" };
223             }
224         }
225     }
226 };
227 
228 #endif // BUFFERED_CHANNEL_H
229