1[/ 2 Copyright Oliver Kowalke 2013. 3 Distributed under the Boost Software License, Version 1.0. 4 (See accompanying file LICENSE_1_0.txt or copy at 5 http://www.boost.org/LICENSE_1_0.txt 6] 7 8[section:buffered_channel Buffered Channel] 9 10__boost_fiber__ provides a bounded, buffered channel (MPMC queue) suitable to 11synchonize fibers (running on same or different threads) via asynchronouss 12message passing. 13 14 typedef boost::fibers::buffered_channel< int > channel_t; 15 16 void send( channel_t & chan) { 17 for ( int i = 0; i < 5; ++i) { 18 chan.push( i); 19 } 20 chan.close(); 21 } 22 23 void recv( channel_t & chan) { 24 int i; 25 while ( boost::fibers::channel_op_status::success == chan.pop(i) ) { 26 std::cout << "received " << i << std::endl; 27 } 28 } 29 30 channel_t chan{ 2 }; 31 boost::fibers::fiber f1( std::bind( send, std::ref( chan) ) ); 32 boost::fibers::fiber f2( std::bind( recv, std::ref( chan) ) ); 33 34 f1.join(); 35 f2.join(); 36 37Class `buffered_channel` supports range-for syntax: 38 39 typedef boost::fibers::buffered_channel< int > channel_t; 40 41 void foo( channel_t & chan) { 42 chan.push( 1); 43 chan.push( 1); 44 chan.push( 2); 45 chan.push( 3); 46 chan.push( 5); 47 chan.push( 8); 48 chan.push( 12); 49 chan.close(); 50 } 51 52 void bar( channel_t & chan) { 53 for ( unsigned int value : chan) { 54 std::cout << value << " "; 55 } 56 std::cout << std::endl; 57 } 58 59 60[template_heading buffered_channel] 61 62 #include <boost/fiber/buffered_channel.hpp> 63 64 namespace boost { 65 namespace fibers { 66 67 template< typename T > 68 class buffered_channel { 69 public: 70 typedef T value_type; 71 72 class iterator; 73 74 explicit buffered_channel( std::size_t capacity); 75 76 buffered_channel( buffered_channel const& other) = delete; 77 buffered_channel & operator=( buffered_channel const& other) = delete; 78 79 void close() noexcept; 80 81 channel_op_status push( value_type const& va); 82 channel_op_status push( value_type && va); 83 template< typename Rep, typename Period > 84 channel_op_status push_wait_for( 85 value_type const& va, 86 std::chrono::duration< Rep, Period > const& timeout_duration); 87 channel_op_status push_wait_for( value_type && va, 88 std::chrono::duration< Rep, Period > const& timeout_duration); 89 template< typename Clock, typename Duration > 90 channel_op_status push_wait_until( 91 value_type const& va, 92 std::chrono::time_point< Clock, Duration > const& timeout_time); 93 template< typename Clock, typename Duration > 94 channel_op_status push_wait_until( 95 value_type && va, 96 std::chrono::time_point< Clock, Duration > const& timeout_time); 97 channel_op_status try_push( value_type const& va); 98 channel_op_status try_push( value_type && va); 99 100 channel_op_status pop( value_type & va); 101 value_type value_pop(); 102 template< typename Rep, typename Period > 103 channel_op_status pop_wait_for( 104 value_type & va, 105 std::chrono::duration< Rep, Period > const& timeout_duration); 106 template< typename Clock, typename Duration > 107 channel_op_status pop_wait_until( 108 value_type & va, 109 std::chrono::time_point< Clock, Duration > const& timeout_time); 110 channel_op_status try_pop( value_type & va); 111 }; 112 113 template< typename T > 114 buffered_channel< T >::iterator begin( buffered_channel< T > & chan); 115 116 template< typename T > 117 buffered_channel< T >::iterator end( buffered_channel< T > & chan); 118 119 }} 120 121[heading Constructor] 122 123 explicit buffered_channel( std::size_t capacity); 124 125[variablelist 126[[Preconditions:] [`2<=capacity && 0==(capacity & (capacity-1))`]] 127[[Effects:] [The constructor constructs an object of class `buffered_channel` 128with an internal buffer of size `capacity`.]] 129[[Throws:] [`fiber_error`]] 130[[Error Conditions:] [ 131[*invalid_argument]: if `0==capacity || 0!=(capacity & (capacity-1))`.]] 132[[Notes:] [A `push()`, `push_wait_for()` or `push_wait_until()` will not block 133until the number of values in the channel becomes equal to `capacity`. 134The channel can hold only `capacity - 1` elements, otherwise it is 135considered to be full.]] 136] 137 138[member_heading buffered_channel..close] 139 140 void close() noexcept; 141 142[variablelist 143[[Effects:] [Deactivates the channel. No values can be put after calling 144`this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()` 145or `this->pop_wait_until()` will return `closed`. Fibers blocked in 146`this->value_pop()` will receive an exception.]] 147[[Throws:] [Nothing.]] 148[[Note:] [`close()` is like closing a pipe. It informs waiting consumers 149that no more values will arrive.]] 150] 151 152[template buffered_channel_push_effects[enqueues] If channel is closed, returns 153`closed`. [enqueues] the value in the channel, wakes up a fiber 154blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or 155`this->pop_wait_until()` and returns `success`. If the channel is full, 156the fiber is blocked.] 157 158[member_heading buffered_channel..push] 159 160 channel_op_status push( value_type const& va); 161 channel_op_status push( value_type && va); 162 163[variablelist 164[[Effects:] [[buffered_channel_push_effects Otherwise enqueues]]] 165[[Throws:] [Exceptions thrown by copy- or move-operations.]] 166] 167 168[template buffered_channel_try_push_effects[enqueues] If channel is closed, returns 169`closed`. [enqueues] the value in the channel, wakes up a fiber 170blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or 171`this->pop_wait_until()` and returns `success`. If the channel is full, 172it doesn't block and returns `full`.] 173 174[member_heading buffered_channel..try_push] 175 176 channel_op_status try_push( value_type const& va); 177 channel_op_status try_push( value_type && va); 178 179[variablelist 180[[Effects:] [[buffered_channel_try_push_effects Otherwise enqueues]]] 181[[Throws:] [Exceptions thrown by copy- or move-operations.]] 182] 183 184[template buffered_channel_pop[cls unblocking] 185[member_heading [cls]..pop] 186 187 channel_op_status pop( value_type & va); 188 189[variablelist 190[[Effects:] [Dequeues a value from the channel. If the channel is empty, the 191fiber gets suspended until at least one new item is `push()`ed (return value 192`success` and `va` contains dequeued value) or the channel gets `close()`d 193(return value `closed`)[unblocking]]] 194[[Throws:] [Exceptions thrown by copy- or move-operations.]] 195] 196] 197[buffered_channel_pop buffered_channel .] 198 199[template buffered_channel_value_pop[cls unblocking] 200[member_heading [cls]..value_pop] 201 202 value_type value_pop(); 203 204[variablelist 205[[Effects:] [Dequeues a value from the channel. If the channel is empty, the 206fiber gets suspended until at least one new item is `push()`ed or the channel 207gets `close()`d (which throws an exception)[unblocking]]] 208[[Throws:] [`fiber_error` if `*this` is closed or by copy- or move-operations.]] 209[[Error conditions:] [`std::errc::operation_not_permitted`]] 210] 211] 212[buffered_channel_value_pop buffered_channel .] 213 214[template buffered_channel_try_pop[cls unblocking] 215[member_heading [cls]..try_pop] 216 217 channel_op_status try_pop( value_type & va); 218 219[variablelist 220[[Effects:] [If channel is empty, returns `empty`. If channel is closed, 221returns `closed`. Otherwise it returns `success` and `va` contains the 222dequeued value[unblocking]]] 223[[Throws:] [Exceptions thrown by copy- or move-operations.]] 224] 225] 226[buffered_channel_try_pop buffered_channel .] 227 228[template buffered_channel_pop_wait_until_effects[endtime unblocking] If channel 229is not empty, immediately dequeues a value from the channel. Otherwise 230the fiber gets suspended until at least one new item is `push()`ed (return 231value `success` and `va` contains dequeued value), or the channel gets 232`close()`d (return value `closed`), or the system time reaches [endtime] 233(return value `timeout`)[unblocking]] 234 235[template buffered_channel_pop_wait_for[cls unblocking] 236[member_heading [cls]..pop_wait_for] 237 238 template< typename Rep, typename Period > 239 channel_op_status pop_wait_for( 240 value_type & va, 241 std::chrono::duration< Rep, Period > const& timeout_duration) 242 243[variablelist 244[[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout 245time as (system time + `timeout_duration`). 246[buffered_channel_pop_wait_until_effects the computed timeout time..[unblocking]]]] 247[[Throws:] [timeout-related exceptions or by copy- or move-operations.]] 248] 249] 250[buffered_channel_pop_wait_for buffered_channel .] 251 252[template buffered_channel_pop_wait_until[cls unblocking] 253[member_heading [cls]..pop_wait_until] 254 255 template< typename Clock, typename Duration > 256 channel_op_status pop_wait_until( 257 value_type & va, 258 std::chrono::time_point< Clock, Duration > const& timeout_time) 259 260[variablelist 261[[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`. 262[buffered_channel_pop_wait_until_effects the passed `time_point`..[unblocking]]]] 263[[Throws:] [timeout-related exceptions or by copy- or move-operations.]] 264] 265] 266[buffered_channel_pop_wait_until buffered_channel .] 267 268[heading Non-member function `begin( buffered_channel< T > &)`] 269 template< typename T > 270 buffered_channel< T >::iterator begin( buffered_channel< T > &); 271 272[variablelist 273[[Returns:] [Returns a range-iterator (input-iterator).]] 274] 275 276[heading Non-member function `end( buffered_channel< T > &)`] 277 template< typename T > 278 buffered_channel< R >::iterator end( buffered_channel< T > &); 279 280[variablelist 281[[Returns:] [Returns an end range-iterator (input-iterator).]] 282] 283 284[endsect] 285