• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1[/
2          Copyright Oliver Kowalke 2013.
3 Distributed under the Boost Software License, Version 1.0.
4    (See accompanying file LICENSE_1_0.txt or copy at
5          http://www.boost.org/LICENSE_1_0.txt
6]
7
8[section:buffered_channel Buffered Channel]
9
10__boost_fiber__ provides a bounded, buffered channel (MPMC queue) suitable to
11synchonize fibers (running on same or different threads) via asynchronouss
12message passing.
13
14    typedef boost::fibers::buffered_channel< int > channel_t;
15
16    void send( channel_t & chan) {
17        for ( int i = 0; i < 5; ++i) {
18            chan.push( i);
19        }
20        chan.close();
21    }
22
23    void recv( channel_t & chan) {
24        int i;
25        while ( boost::fibers::channel_op_status::success == chan.pop(i) ) {
26            std::cout << "received " << i << std::endl;
27        }
28    }
29
30    channel_t chan{ 2 };
31    boost::fibers::fiber f1( std::bind( send, std::ref( chan) ) );
32    boost::fibers::fiber f2( std::bind( recv, std::ref( chan) ) );
33
34    f1.join();
35    f2.join();
36
37Class `buffered_channel` supports range-for syntax:
38
39    typedef boost::fibers::buffered_channel< int > channel_t;
40
41    void foo( channel_t & chan) {
42        chan.push( 1);
43        chan.push( 1);
44        chan.push( 2);
45        chan.push( 3);
46        chan.push( 5);
47        chan.push( 8);
48        chan.push( 12);
49        chan.close();
50    }
51
52    void bar( channel_t & chan) {
53        for ( unsigned int value : chan) {
54            std::cout << value << " ";
55        }
56        std::cout << std::endl;
57    }
58
59
60[template_heading buffered_channel]
61
62        #include <boost/fiber/buffered_channel.hpp>
63
64        namespace boost {
65        namespace fibers {
66
67        template< typename T >
68        class buffered_channel {
69        public:
70            typedef T   value_type;
71
72            class iterator;
73
74            explicit buffered_channel( std::size_t capacity);
75
76            buffered_channel( buffered_channel const& other) = delete;
77            buffered_channel & operator=( buffered_channel const& other) = delete;
78
79            void close() noexcept;
80
81            channel_op_status push( value_type const& va);
82            channel_op_status push( value_type && va);
83            template< typename Rep, typename Period >
84            channel_op_status push_wait_for(
85                value_type const& va,
86                std::chrono::duration< Rep, Period > const& timeout_duration);
87            channel_op_status push_wait_for( value_type && va,
88                std::chrono::duration< Rep, Period > const& timeout_duration);
89            template< typename Clock, typename Duration >
90            channel_op_status push_wait_until(
91                value_type const& va,
92                std::chrono::time_point< Clock, Duration > const& timeout_time);
93            template< typename Clock, typename Duration >
94            channel_op_status push_wait_until(
95                value_type && va,
96                std::chrono::time_point< Clock, Duration > const& timeout_time);
97            channel_op_status try_push( value_type const& va);
98            channel_op_status try_push( value_type && va);
99
100            channel_op_status pop( value_type & va);
101            value_type value_pop();
102            template< typename Rep, typename Period >
103            channel_op_status pop_wait_for(
104                value_type & va,
105                std::chrono::duration< Rep, Period > const& timeout_duration);
106            template< typename Clock, typename Duration >
107            channel_op_status pop_wait_until(
108                value_type & va,
109                std::chrono::time_point< Clock, Duration > const& timeout_time);
110            channel_op_status try_pop( value_type & va);
111        };
112
113        template< typename T >
114        buffered_channel< T >::iterator begin( buffered_channel< T > & chan);
115
116        template< typename T >
117        buffered_channel< T >::iterator end( buffered_channel< T > & chan);
118
119        }}
120
121[heading Constructor]
122
123        explicit buffered_channel( std::size_t capacity);
124
125[variablelist
126[[Preconditions:] [`2<=capacity && 0==(capacity & (capacity-1))`]]
127[[Effects:] [The constructor constructs an object of class `buffered_channel`
128with an internal buffer of size `capacity`.]]
129[[Throws:] [`fiber_error`]]
130[[Error Conditions:] [
131[*invalid_argument]: if `0==capacity || 0!=(capacity & (capacity-1))`.]]
132[[Notes:] [A `push()`, `push_wait_for()` or `push_wait_until()` will not block
133until the number of values in the channel becomes equal to `capacity`.
134The channel can hold only `capacity - 1` elements, otherwise it is
135considered to be full.]]
136]
137
138[member_heading buffered_channel..close]
139
140        void close() noexcept;
141
142[variablelist
143[[Effects:] [Deactivates the channel. No values can be put after calling
144`this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()`
145or `this->pop_wait_until()` will return `closed`. Fibers blocked in
146`this->value_pop()` will receive an exception.]]
147[[Throws:] [Nothing.]]
148[[Note:] [`close()` is like closing a pipe. It informs waiting consumers
149that no more values will arrive.]]
150]
151
152[template buffered_channel_push_effects[enqueues] If channel is closed, returns
153`closed`. [enqueues] the value in the channel, wakes up a fiber
154blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
155`this->pop_wait_until()` and returns `success`. If the channel is full,
156the fiber is blocked.]
157
158[member_heading buffered_channel..push]
159
160        channel_op_status push( value_type const& va);
161        channel_op_status push( value_type && va);
162
163[variablelist
164[[Effects:] [[buffered_channel_push_effects Otherwise enqueues]]]
165[[Throws:] [Exceptions thrown by copy- or move-operations.]]
166]
167
168[template buffered_channel_try_push_effects[enqueues] If channel is closed, returns
169`closed`. [enqueues] the value in the channel, wakes up a fiber
170blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
171`this->pop_wait_until()` and returns `success`. If the channel is full,
172it doesn't block and returns `full`.]
173
174[member_heading buffered_channel..try_push]
175
176        channel_op_status try_push( value_type const& va);
177        channel_op_status try_push( value_type && va);
178
179[variablelist
180[[Effects:] [[buffered_channel_try_push_effects Otherwise enqueues]]]
181[[Throws:] [Exceptions thrown by copy- or move-operations.]]
182]
183
184[template buffered_channel_pop[cls unblocking]
185[member_heading [cls]..pop]
186
187        channel_op_status pop( value_type & va);
188
189[variablelist
190[[Effects:] [Dequeues a value from the channel. If the channel is empty, the
191fiber gets suspended until at least one new item is `push()`ed (return value
192`success` and `va` contains dequeued value) or the channel gets `close()`d
193(return value `closed`)[unblocking]]]
194[[Throws:] [Exceptions thrown by copy- or move-operations.]]
195]
196]
197[buffered_channel_pop buffered_channel .]
198
199[template buffered_channel_value_pop[cls unblocking]
200[member_heading [cls]..value_pop]
201
202        value_type value_pop();
203
204[variablelist
205[[Effects:] [Dequeues a value from the channel. If the channel is empty, the
206fiber gets suspended until at least one new item is `push()`ed or the channel
207gets `close()`d (which throws an exception)[unblocking]]]
208[[Throws:] [`fiber_error` if `*this` is closed or by copy- or move-operations.]]
209[[Error conditions:] [`std::errc::operation_not_permitted`]]
210]
211]
212[buffered_channel_value_pop buffered_channel .]
213
214[template buffered_channel_try_pop[cls unblocking]
215[member_heading [cls]..try_pop]
216
217        channel_op_status try_pop( value_type & va);
218
219[variablelist
220[[Effects:] [If channel is empty, returns `empty`. If channel is closed,
221returns `closed`. Otherwise it returns `success` and `va` contains the
222dequeued value[unblocking]]]
223[[Throws:] [Exceptions thrown by copy- or move-operations.]]
224]
225]
226[buffered_channel_try_pop buffered_channel .]
227
228[template buffered_channel_pop_wait_until_effects[endtime unblocking] If channel
229is not empty, immediately dequeues a value from the channel. Otherwise
230the fiber gets suspended until at least one new item is `push()`ed (return
231value `success` and `va` contains dequeued value), or the channel gets
232`close()`d (return value `closed`), or the system time reaches [endtime]
233(return value `timeout`)[unblocking]]
234
235[template buffered_channel_pop_wait_for[cls unblocking]
236[member_heading [cls]..pop_wait_for]
237
238        template< typename Rep, typename Period >
239        channel_op_status pop_wait_for(
240            value_type & va,
241            std::chrono::duration< Rep, Period > const& timeout_duration)
242
243[variablelist
244[[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout
245time as (system time + `timeout_duration`).
246[buffered_channel_pop_wait_until_effects the computed timeout time..[unblocking]]]]
247[[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
248]
249]
250[buffered_channel_pop_wait_for buffered_channel .]
251
252[template buffered_channel_pop_wait_until[cls unblocking]
253[member_heading [cls]..pop_wait_until]
254
255        template< typename Clock, typename Duration >
256        channel_op_status pop_wait_until(
257            value_type & va,
258            std::chrono::time_point< Clock, Duration > const& timeout_time)
259
260[variablelist
261[[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`.
262[buffered_channel_pop_wait_until_effects the passed `time_point`..[unblocking]]]]
263[[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
264]
265]
266[buffered_channel_pop_wait_until buffered_channel .]
267
268[heading Non-member function `begin( buffered_channel< T > &)`]
269    template< typename T >
270    buffered_channel< T >::iterator begin( buffered_channel< T > &);
271
272[variablelist
273[[Returns:] [Returns a range-iterator (input-iterator).]]
274]
275
276[heading Non-member function `end( buffered_channel< T > &)`]
277    template< typename T >
278    buffered_channel< R >::iterator end( buffered_channel< T > &);
279
280[variablelist
281[[Returns:] [Returns an end range-iterator (input-iterator).]]
282]
283
284[endsect]
285