1
2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
8 #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
9
10 #include <atomic>
11 #include <chrono>
12 #include <cstddef>
13 #include <cstdint>
14 #include <memory>
15 #include <vector>
16
17 #include <boost/config.hpp>
18
19 #include <boost/fiber/channel_op_status.hpp>
20 #include <boost/fiber/context.hpp>
21 #include <boost/fiber/detail/config.hpp>
22 #include <boost/fiber/detail/convert.hpp>
23 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
24 #include <boost/fiber/detail/exchange.hpp>
25 #endif
26 #include <boost/fiber/detail/spinlock.hpp>
27 #include <boost/fiber/exceptions.hpp>
28
29 #ifdef BOOST_HAS_ABI_HEADERS
30 # include BOOST_ABI_PREFIX
31 #endif
32
33 namespace boost {
34 namespace fibers {
35
36 template< typename T >
37 class unbuffered_channel {
38 public:
39 using value_type = typename std::remove_reference<T>::type;
40
41 private:
42 using wait_queue_type = context::wait_queue_t;
43
44 struct slot {
45 value_type value;
46 context * ctx;
47
slotboost::fibers::unbuffered_channel::slot48 slot( value_type const& value_, context * ctx_) :
49 value{ value_ },
50 ctx{ ctx_ } {
51 }
52
slotboost::fibers::unbuffered_channel::slot53 slot( value_type && value_, context * ctx_) :
54 value{ std::move( value_) },
55 ctx{ ctx_ } {
56 }
57 };
58
59 // shared cacheline
60 std::atomic< slot * > slot_{ nullptr };
61 // shared cacheline
62 std::atomic_bool closed_{ false };
63 mutable detail::spinlock splk_producers_{};
64 wait_queue_type waiting_producers_{};
65 mutable detail::spinlock splk_consumers_{};
66 wait_queue_type waiting_consumers_{};
67 char pad_[cacheline_length];
68
is_empty_()69 bool is_empty_() {
70 return nullptr == slot_.load( std::memory_order_acquire);
71 }
72
try_push_(slot * own_slot)73 bool try_push_( slot * own_slot) {
74 for (;;) {
75 slot * s = slot_.load( std::memory_order_acquire);
76 if ( nullptr == s) {
77 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
78 continue;
79 }
80 return true;
81 }
82 return false;
83 }
84 }
85
try_pop_()86 slot * try_pop_() {
87 slot * nil_slot = nullptr;
88 for (;;) {
89 slot * s = slot_.load( std::memory_order_acquire);
90 if ( nullptr != s) {
91 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
92 continue;}
93 }
94 return s;
95 }
96 }
97
98 public:
99 unbuffered_channel() = default;
100
~unbuffered_channel()101 ~unbuffered_channel() {
102 close();
103 }
104
105 unbuffered_channel( unbuffered_channel const&) = delete;
106 unbuffered_channel & operator=( unbuffered_channel const&) = delete;
107
is_closed() const108 bool is_closed() const noexcept {
109 return closed_.load( std::memory_order_acquire);
110 }
111
close()112 void close() noexcept {
113 context * active_ctx = context::active();
114 // set flag
115 if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
116 // notify current waiting
117 slot * s = slot_.load( std::memory_order_acquire);
118 if ( nullptr != s) {
119 // notify context
120 active_ctx->schedule( s->ctx);
121 }
122 // notify all waiting producers
123 detail::spinlock_lock lk1{ splk_producers_ };
124 while ( ! waiting_producers_.empty() ) {
125 context * producer_ctx = & waiting_producers_.front();
126 waiting_producers_.pop_front();
127 auto expected = reinterpret_cast< std::intptr_t >( this);
128 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
129 // notify context
130 active_ctx->schedule( producer_ctx);
131 } else if ( static_cast< std::intptr_t >( 0) == expected) {
132 // no timed-wait op.
133 // notify context
134 active_ctx->schedule( producer_ctx);
135 }
136 }
137 // notify all waiting consumers
138 detail::spinlock_lock lk2{ splk_consumers_ };
139 while ( ! waiting_consumers_.empty() ) {
140 context * consumer_ctx = & waiting_consumers_.front();
141 waiting_consumers_.pop_front();
142 auto expected = reinterpret_cast< std::intptr_t >( this);
143 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
144 // notify context
145 active_ctx->schedule( consumer_ctx);
146 } else if ( static_cast< std::intptr_t >( 0) == expected) {
147 // no timed-wait op.
148 // notify context
149 active_ctx->schedule( consumer_ctx);
150 }
151 }
152 }
153 }
154
push(value_type const & value)155 channel_op_status push( value_type const& value) {
156 context * active_ctx = context::active();
157 slot s{ value, active_ctx };
158 for (;;) {
159 if ( BOOST_UNLIKELY( is_closed() ) ) {
160 return channel_op_status::closed;
161 }
162 if ( try_push_( & s) ) {
163 detail::spinlock_lock lk{ splk_consumers_ };
164 // notify one waiting consumer
165 while ( ! waiting_consumers_.empty() ) {
166 context * consumer_ctx = & waiting_consumers_.front();
167 waiting_consumers_.pop_front();
168 auto expected = reinterpret_cast< std::intptr_t >( this);
169 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
170 // notify context
171 active_ctx->schedule( consumer_ctx);
172 break;
173 }
174 if ( static_cast< std::intptr_t >( 0) == expected) {
175 // no timed-wait op.
176 // notify context
177 active_ctx->schedule( consumer_ctx);
178 break;
179 }
180 }
181 // suspend till value has been consumed
182 active_ctx->suspend( lk);
183 // resumed
184 if ( nullptr == s.ctx) {
185 // value has been consumed
186 return channel_op_status::success;
187 }
188 // channel was closed before value was consumed
189 return channel_op_status::closed;
190 }
191 detail::spinlock_lock lk{ splk_producers_ };
192 if ( BOOST_UNLIKELY( is_closed() ) ) {
193 return channel_op_status::closed;
194 }
195 if ( is_empty_() ) {
196 continue;
197 }
198 active_ctx->wait_link( waiting_producers_);
199 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
200 // suspend this producer
201 active_ctx->suspend( lk);
202 // resumed, slot mabye free
203 }
204 }
205
push(value_type && value)206 channel_op_status push( value_type && value) {
207 context * active_ctx = context::active();
208 slot s{ std::move( value), active_ctx };
209 for (;;) {
210 if ( BOOST_UNLIKELY( is_closed() ) ) {
211 return channel_op_status::closed;
212 }
213 if ( try_push_( & s) ) {
214 detail::spinlock_lock lk{ splk_consumers_ };
215 // notify one waiting consumer
216 while ( ! waiting_consumers_.empty() ) {
217 context * consumer_ctx = & waiting_consumers_.front();
218 waiting_consumers_.pop_front();
219 auto expected = reinterpret_cast< std::intptr_t >( this);
220 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
221 // notify context
222 active_ctx->schedule( consumer_ctx);
223 break;
224 } if ( static_cast< std::intptr_t >( 0) == expected) {
225 // no timed-wait op.
226 // notify context
227 active_ctx->schedule( consumer_ctx);
228 break;
229 }
230 }
231 // suspend till value has been consumed
232 active_ctx->suspend( lk);
233 // resumed
234 if ( nullptr == s.ctx) {
235 // value has been consumed
236 return channel_op_status::success;
237 }
238 // channel was closed before value was consumed
239 return channel_op_status::closed;
240 }
241 detail::spinlock_lock lk{ splk_producers_ };
242 if ( BOOST_UNLIKELY( is_closed() ) ) {
243 return channel_op_status::closed;
244 }
245 if ( is_empty_() ) {
246 continue;
247 }
248 active_ctx->wait_link( waiting_producers_);
249 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
250 // suspend this producer
251 active_ctx->suspend( lk);
252 // resumed, slot mabye free
253 }
254 }
255
256 template< typename Rep, typename Period >
push_wait_for(value_type const & value,std::chrono::duration<Rep,Period> const & timeout_duration)257 channel_op_status push_wait_for( value_type const& value,
258 std::chrono::duration< Rep, Period > const& timeout_duration) {
259 return push_wait_until( value,
260 std::chrono::steady_clock::now() + timeout_duration);
261 }
262
263 template< typename Rep, typename Period >
push_wait_for(value_type && value,std::chrono::duration<Rep,Period> const & timeout_duration)264 channel_op_status push_wait_for( value_type && value,
265 std::chrono::duration< Rep, Period > const& timeout_duration) {
266 return push_wait_until( std::forward< value_type >( value),
267 std::chrono::steady_clock::now() + timeout_duration);
268 }
269
270 template< typename Clock, typename Duration >
push_wait_until(value_type const & value,std::chrono::time_point<Clock,Duration> const & timeout_time_)271 channel_op_status push_wait_until( value_type const& value,
272 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
273 context * active_ctx = context::active();
274 slot s{ value, active_ctx };
275 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
276 for (;;) {
277 if ( BOOST_UNLIKELY( is_closed() ) ) {
278 return channel_op_status::closed;
279 }
280 if ( try_push_( & s) ) {
281 detail::spinlock_lock lk{ splk_consumers_ };
282 // notify one waiting consumer
283 while ( ! waiting_consumers_.empty() ) {
284 context * consumer_ctx = & waiting_consumers_.front();
285 waiting_consumers_.pop_front();
286 auto expected = reinterpret_cast< std::intptr_t >( this);
287 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
288 // notify context
289 active_ctx->schedule( consumer_ctx);
290 break;
291 }
292 if ( static_cast< std::intptr_t >( 0) == expected) {
293 // no timed-wait op.
294 // notify context
295 active_ctx->schedule( consumer_ctx);
296 break;
297 }
298 }
299 // suspend this producer
300 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
301 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
302 // clear slot
303 slot * nil_slot = nullptr, * own_slot = & s;
304 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
305 // resumed, value has not been consumed
306 return channel_op_status::timeout;
307 }
308 // resumed
309 if ( nullptr == s.ctx) {
310 // value has been consumed
311 return channel_op_status::success;
312 }
313 // channel was closed before value was consumed
314 return channel_op_status::closed;
315 }
316 detail::spinlock_lock lk{ splk_producers_ };
317 if ( BOOST_UNLIKELY( is_closed() ) ) {
318 return channel_op_status::closed;
319 }
320 if ( is_empty_() ) {
321 continue;
322 }
323 active_ctx->wait_link( waiting_producers_);
324 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
325 // suspend this producer
326 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
327 // relock local lk
328 lk.lock();
329 // remove from waiting-queue
330 waiting_producers_.remove( * active_ctx);
331 return channel_op_status::timeout;
332 }
333 // resumed, slot maybe free
334 }
335 }
336
337 template< typename Clock, typename Duration >
push_wait_until(value_type && value,std::chrono::time_point<Clock,Duration> const & timeout_time_)338 channel_op_status push_wait_until( value_type && value,
339 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
340 context * active_ctx = context::active();
341 slot s{ std::move( value), active_ctx };
342 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
343 for (;;) {
344 if ( BOOST_UNLIKELY( is_closed() ) ) {
345 return channel_op_status::closed;
346 }
347 if ( try_push_( & s) ) {
348 detail::spinlock_lock lk{ splk_consumers_ };
349 // notify one waiting consumer
350 while ( ! waiting_consumers_.empty() ) {
351 context * consumer_ctx = & waiting_consumers_.front();
352 waiting_consumers_.pop_front();
353 auto expected = reinterpret_cast< std::intptr_t >( this);
354 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
355 // notify context
356 active_ctx->schedule( consumer_ctx);
357 break;
358 } if ( static_cast< std::intptr_t >( 0) == expected) {
359 // no timed-wait op.
360 // notify context
361 active_ctx->schedule( consumer_ctx);
362 break;
363 }
364 }
365 // suspend this producer
366 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
367 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
368 // clear slot
369 slot * nil_slot = nullptr, * own_slot = & s;
370 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
371 // resumed, value has not been consumed
372 return channel_op_status::timeout;
373 }
374 // resumed
375 if ( nullptr == s.ctx) {
376 // value has been consumed
377 return channel_op_status::success;
378 }
379 // channel was closed before value was consumed
380 return channel_op_status::closed;
381 }
382 detail::spinlock_lock lk{ splk_producers_ };
383 if ( BOOST_UNLIKELY( is_closed() ) ) {
384 return channel_op_status::closed;
385 }
386 if ( is_empty_() ) {
387 continue;
388 }
389 active_ctx->wait_link( waiting_producers_);
390 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
391 // suspend this producer
392 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
393 // relock local lk
394 lk.lock();
395 // remove from waiting-queue
396 waiting_producers_.remove( * active_ctx);
397 return channel_op_status::timeout;
398 }
399 // resumed, slot maybe free
400 }
401 }
402
pop(value_type & value)403 channel_op_status pop( value_type & value) {
404 context * active_ctx = context::active();
405 slot * s = nullptr;
406 for (;;) {
407 if ( nullptr != ( s = try_pop_() ) ) {
408 {
409 detail::spinlock_lock lk{ splk_producers_ };
410 // notify one waiting producer
411 while ( ! waiting_producers_.empty() ) {
412 context * producer_ctx = & waiting_producers_.front();
413 waiting_producers_.pop_front();
414 auto expected = reinterpret_cast< std::intptr_t >( this);
415 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
416 lk.unlock();
417 // notify context
418 active_ctx->schedule( producer_ctx);
419 break;
420 } if ( static_cast< std::intptr_t >( 0) == expected) {
421 lk.unlock();
422 // no timed-wait op.
423 // notify context
424 active_ctx->schedule( producer_ctx);
425 break;
426 }
427 }
428 }
429 value = std::move( s->value);
430 // notify context
431 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
432 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
433 #else
434 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
435 #endif
436 return channel_op_status::success;
437 }
438 detail::spinlock_lock lk{ splk_consumers_ };
439 if ( BOOST_UNLIKELY( is_closed() ) ) {
440 return channel_op_status::closed;
441 }
442 if ( ! is_empty_() ) {
443 continue;
444 }
445 active_ctx->wait_link( waiting_consumers_);
446 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
447 // suspend this consumer
448 active_ctx->suspend( lk);
449 // resumed, slot mabye set
450 }
451 }
452
value_pop()453 value_type value_pop() {
454 context * active_ctx = context::active();
455 slot * s = nullptr;
456 for (;;) {
457 if ( nullptr != ( s = try_pop_() ) ) {
458 {
459 detail::spinlock_lock lk{ splk_producers_ };
460 // notify one waiting producer
461 while ( ! waiting_producers_.empty() ) {
462 context * producer_ctx = & waiting_producers_.front();
463 waiting_producers_.pop_front();
464 auto expected = reinterpret_cast< std::intptr_t >( this);
465 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
466 lk.unlock();
467 // notify context
468 active_ctx->schedule( producer_ctx);
469 break;
470 } if ( static_cast< std::intptr_t >( 0) == expected) {
471 lk.unlock();
472 // no timed-wait op.
473 // notify context
474 active_ctx->schedule( producer_ctx);
475 break;
476 }
477 }
478 }
479 // consume value
480 value_type value = std::move( s->value);
481 // notify context
482 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
483 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
484 #else
485 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
486 #endif
487 return std::move( value);
488 }
489 detail::spinlock_lock lk{ splk_consumers_ };
490 if ( BOOST_UNLIKELY( is_closed() ) ) {
491 throw fiber_error{
492 std::make_error_code( std::errc::operation_not_permitted),
493 "boost fiber: channel is closed" };
494 }
495 if ( ! is_empty_() ) {
496 continue;
497 }
498 active_ctx->wait_link( waiting_consumers_);
499 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
500 // suspend this consumer
501 active_ctx->suspend( lk);
502 // resumed, slot mabye set
503 }
504 }
505
506 template< typename Rep, typename Period >
pop_wait_for(value_type & value,std::chrono::duration<Rep,Period> const & timeout_duration)507 channel_op_status pop_wait_for( value_type & value,
508 std::chrono::duration< Rep, Period > const& timeout_duration) {
509 return pop_wait_until( value,
510 std::chrono::steady_clock::now() + timeout_duration);
511 }
512
513 template< typename Clock, typename Duration >
pop_wait_until(value_type & value,std::chrono::time_point<Clock,Duration> const & timeout_time_)514 channel_op_status pop_wait_until( value_type & value,
515 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
516 context * active_ctx = context::active();
517 slot * s = nullptr;
518 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
519 for (;;) {
520 if ( nullptr != ( s = try_pop_() ) ) {
521 {
522 detail::spinlock_lock lk{ splk_producers_ };
523 // notify one waiting producer
524 while ( ! waiting_producers_.empty() ) {
525 context * producer_ctx = & waiting_producers_.front();
526 waiting_producers_.pop_front();
527 auto expected = reinterpret_cast< std::intptr_t >( this);
528 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
529 lk.unlock();
530 // notify context
531 active_ctx->schedule( producer_ctx);
532 break;
533 }
534 if ( static_cast< std::intptr_t >( 0) == expected) {
535 lk.unlock();
536 // no timed-wait op.
537 // notify context
538 active_ctx->schedule( producer_ctx);
539 break;
540 }
541 }
542 }
543 // consume value
544 value = std::move( s->value);
545 // notify context
546 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
547 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
548 #else
549 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
550 #endif
551 return channel_op_status::success;
552 }
553 detail::spinlock_lock lk{ splk_consumers_ };
554 if ( BOOST_UNLIKELY( is_closed() ) ) {
555 return channel_op_status::closed;
556 }
557 if ( ! is_empty_() ) {
558 continue;
559 }
560 active_ctx->wait_link( waiting_consumers_);
561 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
562 // suspend this consumer
563 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
564 // relock local lk
565 lk.lock();
566 // remove from waiting-queue
567 waiting_consumers_.remove( * active_ctx);
568 return channel_op_status::timeout;
569 }
570 }
571 }
572
573 class iterator {
574 private:
575 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
576
577 unbuffered_channel * chan_{ nullptr };
578 storage_type storage_;
579
increment_()580 void increment_() {
581 BOOST_ASSERT( nullptr != chan_);
582 try {
583 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
584 } catch ( fiber_error const&) {
585 chan_ = nullptr;
586 }
587 }
588
589 public:
590 using iterator_category = std::input_iterator_tag;
591 using difference_type = std::ptrdiff_t;
592 using pointer = value_type *;
593 using reference = value_type &;
594
595 using pointer_t = pointer;
596 using reference_t = reference;
597
598 iterator() noexcept = default;
599
iterator(unbuffered_channel<T> * chan)600 explicit iterator( unbuffered_channel< T > * chan) noexcept :
601 chan_{ chan } {
602 increment_();
603 }
604
iterator(iterator const & other)605 iterator( iterator const& other) noexcept :
606 chan_{ other.chan_ } {
607 }
608
operator =(iterator const & other)609 iterator & operator=( iterator const& other) noexcept {
610 if ( this == & other) return * this;
611 chan_ = other.chan_;
612 return * this;
613 }
614
operator ==(iterator const & other) const615 bool operator==( iterator const& other) const noexcept {
616 return other.chan_ == chan_;
617 }
618
operator !=(iterator const & other) const619 bool operator!=( iterator const& other) const noexcept {
620 return other.chan_ != chan_;
621 }
622
operator ++()623 iterator & operator++() {
624 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
625 increment_();
626 return * this;
627 }
628
629 const iterator operator++( int) = delete;
630
operator *()631 reference_t operator*() noexcept {
632 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
633 }
634
operator ->()635 pointer_t operator->() noexcept {
636 return reinterpret_cast< value_type * >( std::addressof( storage_) );
637 }
638 };
639
640 friend class iterator;
641 };
642
643 template< typename T >
644 typename unbuffered_channel< T >::iterator
begin(unbuffered_channel<T> & chan)645 begin( unbuffered_channel< T > & chan) {
646 return typename unbuffered_channel< T >::iterator( & chan);
647 }
648
649 template< typename T >
650 typename unbuffered_channel< T >::iterator
end(unbuffered_channel<T> &)651 end( unbuffered_channel< T > &) {
652 return typename unbuffered_channel< T >::iterator();
653 }
654
655 }}
656
657 #ifdef BOOST_HAS_ABI_HEADERS
658 # include BOOST_ABI_SUFFIX
659 #endif
660
661 #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H
662