• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1[/
2          Copyright Oliver Kowalke 2013.
3 Distributed under the Boost Software License, Version 1.0.
4    (See accompanying file LICENSE_1_0.txt or copy at
5          http://www.boost.org/LICENSE_1_0.txt
6]
7
8[section:unbuffered_channel Unbuffered Channel]
9
10__boost_fiber__ provides template `unbuffered_channel` suitable to synchonize
11fibers (running on same or different threads) via synchronous message passing.
12A fiber waiting to consume an value will block until the value is produced.
13If a fiber attempts to send a value through an unbuffered channel and no fiber
14is waiting to receive the value, the channel will block the sending fiber.
15
16The unbuffered channel acts as an `rendezvous point`.
17
18    typedef boost::fibers::unbuffered_channel< int > channel_t;
19
20    void send( channel_t & chan) {
21        for ( int i = 0; i < 5; ++i) {
22            chan.push( i);
23        }
24        chan.close();
25    }
26
27    void recv( channel_t & chan) {
28        int i;
29        while ( boost::fibers::channel_op_status::success == chan.pop(i) ) {
30            std::cout << "received " << i << std::endl;
31        }
32    }
33
34    channel_t chan{ 1 };
35    boost::fibers::fiber f1( std::bind( send, std::ref( chan) ) );
36    boost::fibers::fiber f2( std::bind( recv, std::ref( chan) ) );
37
38    f1.join();
39    f2.join();
40
41Range-for syntax is supported:
42
43    typedef boost::fibers::unbuffered_channel< int > channel_t;
44
45    void foo( channel_t & chan) {
46        chan.push( 1);
47        chan.push( 1);
48        chan.push( 2);
49        chan.push( 3);
50        chan.push( 5);
51        chan.push( 8);
52        chan.push( 12);
53        chan.close();
54    }
55
56    void bar( channel_t & chan) {
57        for ( unsigned int value : chan) {
58            std::cout << value << " ";
59        }
60        std::cout << std::endl;
61    }
62
63
64[template_heading unbuffered_channel]
65
66        #include <boost/fiber/unbuffered_channel.hpp>
67
68        namespace boost {
69        namespace fibers {
70
71        template< typename T >
72        class unbuffered_channel {
73        public:
74            typedef T   value_type;
75
76            class iterator;
77
78            unbuffered_channel();
79
80            unbuffered_channel( unbuffered_channel const& other) = delete;
81            unbuffered_channel & operator=( unbuffered_channel const& other) = delete;
82
83            void close() noexcept;
84
85            channel_op_status push( value_type const& va);
86            channel_op_status push( value_type && va);
87            template< typename Rep, typename Period >
88            channel_op_status push_wait_for(
89                value_type const& va,
90                std::chrono::duration< Rep, Period > const& timeout_duration);
91            channel_op_status push_wait_for( value_type && va,
92                std::chrono::duration< Rep, Period > const& timeout_duration);
93            template< typename Clock, typename Duration >
94            channel_op_status push_wait_until(
95                value_type const& va,
96                std::chrono::time_point< Clock, Duration > const& timeout_time);
97            template< typename Clock, typename Duration >
98            channel_op_status push_wait_until(
99                value_type && va,
100                std::chrono::time_point< Clock, Duration > const& timeout_time);
101
102            channel_op_status pop( value_type & va);
103            value_type value_pop();
104            template< typename Rep, typename Period >
105            channel_op_status pop_wait_for(
106                value_type & va,
107                std::chrono::duration< Rep, Period > const& timeout_duration);
108            template< typename Clock, typename Duration >
109            channel_op_status pop_wait_until(
110                value_type & va,
111                std::chrono::time_point< Clock, Duration > const& timeout_time);
112        };
113
114        template< typename T >
115        unbuffered_channel< T >::iterator begin( unbuffered_channel< T > & chan);
116
117        template< typename T >
118        unbuffered_channel< T >::iterator end( unbuffered_channel< T > & chan);
119
120        }}
121
122[heading Constructor]
123
124        unbuffered_channel();
125
126[variablelist
127[[Effects:] [The constructor constructs an object of class `unbuffered_channel`.]]
128]
129
130[member_heading unbuffered_channel..close]
131
132        void close() noexcept;
133
134[variablelist
135[[Effects:] [Deactivates the channel. No values can be put after calling
136`this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()`
137or `this->pop_wait_until()` will return `closed`. Fibers blocked in
138`this->value_pop()` will receive an exception.]]
139[[Throws:] [Nothing.]]
140[[Note:] [`close()` is like closing a pipe. It informs waiting consumers
141that no more values will arrive.]]
142]
143
144[template unbuffered_channel_push_effects[enqueues] If channel is closed, returns
145`closed`. [enqueues] the value in the channel, wakes up a fiber
146blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
147`this->pop_wait_until()` and returns `success`.]
148
149[member_heading unbuffered_channel..push]
150
151        channel_op_status push( value_type const& va);
152        channel_op_status push( value_type && va);
153
154[variablelist
155[[Effects:] [[unbuffered_channel_push_effects Otherwise enqueues]]]
156[[Throws:] [Exceptions thrown by copy- or move-operations.]]
157]
158
159[template unbuffered_channel_pop[cls unblocking]
160[member_heading [cls]..pop]
161
162        channel_op_status pop( value_type & va);
163
164[variablelist
165[[Effects:] [Dequeues a value from the channel. If the channel is empty, the
166fiber gets suspended until at least one new item is `push()`ed (return value
167`success` and `va` contains dequeued value) or the channel gets `close()`d
168(return value `closed`)[unblocking]]]
169[[Throws:] [Exceptions thrown by copy- or move-operations.]]
170]
171]
172[unbuffered_channel_pop unbuffered_channel .]
173
174[template unbuffered_channel_value_pop[cls unblocking]
175[member_heading [cls]..value_pop]
176
177        value_type value_pop();
178
179[variablelist
180[[Effects:] [Dequeues a value from the channel. If the channel is empty, the
181fiber gets suspended until at least one new item is `push()`ed or the channel
182gets `close()`d (which throws an exception)[unblocking]]]
183[[Throws:] [`fiber_error` if `*this` is closed or by copy- or move-operations.]]
184[[Error conditions:] [`std::errc::operation_not_permitted`]]
185]
186]
187[unbuffered_channel_value_pop unbuffered_channel .]
188
189[template unbuffered_channel_pop_wait_until_effects[endtime unblocking] If channel
190is not empty, immediately dequeues a value from the channel. Otherwise
191the fiber gets suspended until at least one new item is `push()`ed (return
192value `success` and `va` contains dequeued value), or the channel gets
193`close()`d (return value `closed`), or the system time reaches [endtime]
194(return value `timeout`)[unblocking]]
195
196[template unbuffered_channel_pop_wait_for[cls unblocking]
197[member_heading [cls]..pop_wait_for]
198
199        template< typename Rep, typename Period >
200        channel_op_status pop_wait_for(
201            value_type & va,
202            std::chrono::duration< Rep, Period > const& timeout_duration)
203
204[variablelist
205[[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout
206time as (system time + `timeout_duration`).
207[unbuffered_channel_pop_wait_until_effects the computed timeout time..[unblocking]]]]
208[[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
209]
210]
211[unbuffered_channel_pop_wait_for unbuffered_channel .]
212
213[template unbuffered_channel_pop_wait_until[cls unblocking]
214[member_heading [cls]..pop_wait_until]
215
216        template< typename Clock, typename Duration >
217        channel_op_status pop_wait_until(
218            value_type & va,
219            std::chrono::time_point< Clock, Duration > const& timeout_time)
220
221[variablelist
222[[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`.
223[unbuffered_channel_pop_wait_until_effects the passed `time_point`..[unblocking]]]]
224[[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
225]
226]
227[unbuffered_channel_pop_wait_until unbuffered_channel .]
228
229[heading Non-member function `begin( unbuffered_channel< T > &)`]
230    template< typename T >
231    unbuffered_channel< T >::iterator begin( unbuffered_channel< T > &);
232
233[variablelist
234[[Returns:] [Returns a range-iterator (input-iterator).]]
235]
236
237[heading Non-member function `end( unbuffered_channel< T > &)`]
238    template< typename T >
239    unbuffered_channel< R >::iterator end( unbuffered_channel< T > &);
240
241[variablelist
242[[Returns:] [Returns an end range-iterator (input-iterator).]]
243]
244
245[endsect]
246