• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 //          Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6 
7 #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
8 #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
9 
10 #include <atomic>
11 #include <chrono>
12 #include <cstddef>
13 #include <cstdint>
14 #include <memory>
15 #include <vector>
16 
17 #include <boost/config.hpp>
18 
19 #include <boost/fiber/channel_op_status.hpp>
20 #include <boost/fiber/context.hpp>
21 #include <boost/fiber/detail/config.hpp>
22 #include <boost/fiber/detail/convert.hpp>
23 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
24 #include <boost/fiber/detail/exchange.hpp>
25 #endif
26 #include <boost/fiber/detail/spinlock.hpp>
27 #include <boost/fiber/exceptions.hpp>
28 
29 #ifdef BOOST_HAS_ABI_HEADERS
30 #  include BOOST_ABI_PREFIX
31 #endif
32 
33 namespace boost {
34 namespace fibers {
35 
36 template< typename T >
37 class unbuffered_channel {
38 public:
39     using value_type = typename std::remove_reference<T>::type;
40 
41 private:
42     using wait_queue_type = context::wait_queue_t;
43 
44     struct slot {
45         value_type  value;
46         context *   ctx;
47 
slotboost::fibers::unbuffered_channel::slot48         slot( value_type const& value_, context * ctx_) :
49             value{ value_ },
50             ctx{ ctx_ } {
51         }
52 
slotboost::fibers::unbuffered_channel::slot53         slot( value_type && value_, context * ctx_) :
54             value{ std::move( value_) },
55             ctx{ ctx_ } {
56         }
57     };
58 
59     // shared cacheline
60     std::atomic< slot * >       slot_{ nullptr };
61     // shared cacheline
62     std::atomic_bool            closed_{ false };
63     mutable detail::spinlock    splk_producers_{};
64     wait_queue_type             waiting_producers_{};
65     mutable detail::spinlock    splk_consumers_{};
66     wait_queue_type             waiting_consumers_{};
67     char                        pad_[cacheline_length];
68 
is_empty_()69     bool is_empty_() {
70         return nullptr == slot_.load( std::memory_order_acquire);
71     }
72 
try_push_(slot * own_slot)73     bool try_push_( slot * own_slot) {
74         for (;;) {
75             slot * s = slot_.load( std::memory_order_acquire);
76             if ( nullptr == s) {
77                 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
78                     continue;
79                 }
80                 return true;
81             }
82             return false;
83         }
84     }
85 
try_pop_()86     slot * try_pop_() {
87         slot * nil_slot = nullptr;
88         for (;;) {
89             slot * s = slot_.load( std::memory_order_acquire);
90             if ( nullptr != s) {
91                 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
92                     continue;}
93             }
94             return s;
95         }
96     }
97 
98 public:
99     unbuffered_channel() = default;
100 
~unbuffered_channel()101     ~unbuffered_channel() {
102         close();
103     }
104 
105     unbuffered_channel( unbuffered_channel const&) = delete;
106     unbuffered_channel & operator=( unbuffered_channel const&) = delete;
107 
is_closed() const108     bool is_closed() const noexcept {
109         return closed_.load( std::memory_order_acquire);
110     }
111 
close()112     void close() noexcept {
113         context * active_ctx = context::active();
114         // set flag
115         if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
116             // notify current waiting
117             slot * s = slot_.load( std::memory_order_acquire);
118             if ( nullptr != s) {
119                 // notify context
120                 active_ctx->schedule( s->ctx);
121             }
122             // notify all waiting producers
123             detail::spinlock_lock lk1{ splk_producers_ };
124             while ( ! waiting_producers_.empty() ) {
125                 context * producer_ctx = & waiting_producers_.front();
126                 waiting_producers_.pop_front();
127                 auto expected = reinterpret_cast< std::intptr_t >( this);
128                 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
129                     // notify context
130                     active_ctx->schedule( producer_ctx);
131                 } else if ( static_cast< std::intptr_t >( 0) == expected) {
132                     // no timed-wait op.
133                     // notify context
134                     active_ctx->schedule( producer_ctx);
135                 }
136             }
137             // notify all waiting consumers
138             detail::spinlock_lock lk2{ splk_consumers_ };
139             while ( ! waiting_consumers_.empty() ) {
140                 context * consumer_ctx = & waiting_consumers_.front();
141                 waiting_consumers_.pop_front();
142                 auto expected = reinterpret_cast< std::intptr_t >( this);
143                 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
144                     // notify context
145                     active_ctx->schedule( consumer_ctx);
146                 } else if ( static_cast< std::intptr_t >( 0) == expected) {
147                     // no timed-wait op.
148                     // notify context
149                     active_ctx->schedule( consumer_ctx);
150                 }
151             }
152         }
153     }
154 
push(value_type const & value)155     channel_op_status push( value_type const& value) {
156         context * active_ctx = context::active();
157         slot s{ value, active_ctx };
158         for (;;) {
159             if ( BOOST_UNLIKELY( is_closed() ) ) {
160                 return channel_op_status::closed;
161             }
162             if ( try_push_( & s) ) {
163                 detail::spinlock_lock lk{ splk_consumers_ };
164                 // notify one waiting consumer
165                 while ( ! waiting_consumers_.empty() ) {
166                     context * consumer_ctx = & waiting_consumers_.front();
167                     waiting_consumers_.pop_front();
168                     auto expected = reinterpret_cast< std::intptr_t >( this);
169                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
170                         // notify context
171                         active_ctx->schedule( consumer_ctx);
172                         break;
173                     }
174                     if ( static_cast< std::intptr_t >( 0) == expected) {
175                         // no timed-wait op.
176                         // notify context
177                         active_ctx->schedule( consumer_ctx);
178                         break;
179                     }
180                 }
181                 // suspend till value has been consumed
182                 active_ctx->suspend( lk);
183                 // resumed
184                 if ( nullptr == s.ctx) {
185                     // value has been consumed
186                     return channel_op_status::success;
187                 }
188                 // channel was closed before value was consumed
189                 return channel_op_status::closed;
190             }
191             detail::spinlock_lock lk{ splk_producers_ };
192             if ( BOOST_UNLIKELY( is_closed() ) ) {
193                 return channel_op_status::closed;
194             }
195             if ( is_empty_() ) {
196                 continue;
197             }
198             active_ctx->wait_link( waiting_producers_);
199             active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
200             // suspend this producer
201             active_ctx->suspend( lk);
202             // resumed, slot mabye free
203         }
204     }
205 
push(value_type && value)206     channel_op_status push( value_type && value) {
207         context * active_ctx = context::active();
208         slot s{ std::move( value), active_ctx };
209         for (;;) {
210             if ( BOOST_UNLIKELY( is_closed() ) ) {
211                 return channel_op_status::closed;
212             }
213             if ( try_push_( & s) ) {
214                 detail::spinlock_lock lk{ splk_consumers_ };
215                 // notify one waiting consumer
216                 while ( ! waiting_consumers_.empty() ) {
217                     context * consumer_ctx = & waiting_consumers_.front();
218                     waiting_consumers_.pop_front();
219                     auto expected = reinterpret_cast< std::intptr_t >( this);
220                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
221                         // notify context
222                         active_ctx->schedule( consumer_ctx);
223                         break;
224                     } if ( static_cast< std::intptr_t >( 0) == expected) {
225                         // no timed-wait op.
226                         // notify context
227                         active_ctx->schedule( consumer_ctx);
228                         break;
229                     }
230                 }
231                 // suspend till value has been consumed
232                 active_ctx->suspend( lk);
233                 // resumed
234                 if ( nullptr == s.ctx) {
235                     // value has been consumed
236                     return channel_op_status::success;
237                 }
238                 // channel was closed before value was consumed
239                 return channel_op_status::closed;
240             }
241             detail::spinlock_lock lk{ splk_producers_ };
242             if ( BOOST_UNLIKELY( is_closed() ) ) {
243                 return channel_op_status::closed;
244             }
245             if ( is_empty_() ) {
246                 continue;
247             }
248             active_ctx->wait_link( waiting_producers_);
249             active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
250             // suspend this producer
251             active_ctx->suspend( lk);
252             // resumed, slot mabye free
253         }
254     }
255 
256     template< typename Rep, typename Period >
push_wait_for(value_type const & value,std::chrono::duration<Rep,Period> const & timeout_duration)257     channel_op_status push_wait_for( value_type const& value,
258                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
259         return push_wait_until( value,
260                                 std::chrono::steady_clock::now() + timeout_duration);
261     }
262 
263     template< typename Rep, typename Period >
push_wait_for(value_type && value,std::chrono::duration<Rep,Period> const & timeout_duration)264     channel_op_status push_wait_for( value_type && value,
265                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
266         return push_wait_until( std::forward< value_type >( value),
267                                 std::chrono::steady_clock::now() + timeout_duration);
268     }
269 
270     template< typename Clock, typename Duration >
push_wait_until(value_type const & value,std::chrono::time_point<Clock,Duration> const & timeout_time_)271     channel_op_status push_wait_until( value_type const& value,
272                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
273         context * active_ctx = context::active();
274         slot s{ value, active_ctx };
275         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
276         for (;;) {
277             if ( BOOST_UNLIKELY( is_closed() ) ) {
278                 return channel_op_status::closed;
279             }
280             if ( try_push_( & s) ) {
281                 detail::spinlock_lock lk{ splk_consumers_ };
282                 // notify one waiting consumer
283                 while ( ! waiting_consumers_.empty() ) {
284                     context * consumer_ctx = & waiting_consumers_.front();
285                     waiting_consumers_.pop_front();
286                     auto expected = reinterpret_cast< std::intptr_t >( this);
287                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
288                         // notify context
289                         active_ctx->schedule( consumer_ctx);
290                         break;
291                     }
292                     if ( static_cast< std::intptr_t >( 0) == expected) {
293                         // no timed-wait op.
294                         // notify context
295                         active_ctx->schedule( consumer_ctx);
296                         break;
297                     }
298                 }
299                 // suspend this producer
300                 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
301                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
302                     // clear slot
303                     slot * nil_slot = nullptr, * own_slot = & s;
304                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
305                     // resumed, value has not been consumed
306                     return channel_op_status::timeout;
307                 }
308                 // resumed
309                 if ( nullptr == s.ctx) {
310                     // value has been consumed
311                     return channel_op_status::success;
312                 }
313                 // channel was closed before value was consumed
314                 return channel_op_status::closed;
315             }
316             detail::spinlock_lock lk{ splk_producers_ };
317             if ( BOOST_UNLIKELY( is_closed() ) ) {
318                 return channel_op_status::closed;
319             }
320             if ( is_empty_() ) {
321                 continue;
322             }
323             active_ctx->wait_link( waiting_producers_);
324             active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
325             // suspend this producer
326             if ( ! active_ctx->wait_until( timeout_time, lk) ) {
327                 // relock local lk
328                 lk.lock();
329                 // remove from waiting-queue
330                 waiting_producers_.remove( * active_ctx);
331                 return channel_op_status::timeout;
332             }
333             // resumed, slot maybe free
334         }
335     }
336 
337     template< typename Clock, typename Duration >
push_wait_until(value_type && value,std::chrono::time_point<Clock,Duration> const & timeout_time_)338     channel_op_status push_wait_until( value_type && value,
339                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
340         context * active_ctx = context::active();
341         slot s{ std::move( value), active_ctx };
342         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
343         for (;;) {
344             if ( BOOST_UNLIKELY( is_closed() ) ) {
345                 return channel_op_status::closed;
346             }
347             if ( try_push_( & s) ) {
348                 detail::spinlock_lock lk{ splk_consumers_ };
349                 // notify one waiting consumer
350                 while ( ! waiting_consumers_.empty() ) {
351                     context * consumer_ctx = & waiting_consumers_.front();
352                     waiting_consumers_.pop_front();
353                     auto expected = reinterpret_cast< std::intptr_t >( this);
354                     if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
355                         // notify context
356                         active_ctx->schedule( consumer_ctx);
357                         break;
358                     } if ( static_cast< std::intptr_t >( 0) == expected) {
359                         // no timed-wait op.
360                         // notify context
361                         active_ctx->schedule( consumer_ctx);
362                         break;
363                     }
364                 }
365                 // suspend this producer
366                 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
367                 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
368                     // clear slot
369                     slot * nil_slot = nullptr, * own_slot = & s;
370                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
371                     // resumed, value has not been consumed
372                     return channel_op_status::timeout;
373                 }
374                 // resumed
375                 if ( nullptr == s.ctx) {
376                     // value has been consumed
377                     return channel_op_status::success;
378                 }
379                 // channel was closed before value was consumed
380                 return channel_op_status::closed;
381             }
382             detail::spinlock_lock lk{ splk_producers_ };
383             if ( BOOST_UNLIKELY( is_closed() ) ) {
384                 return channel_op_status::closed;
385             }
386             if ( is_empty_() ) {
387                 continue;
388             }
389             active_ctx->wait_link( waiting_producers_);
390             active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
391             // suspend this producer
392             if ( ! active_ctx->wait_until( timeout_time, lk) ) {
393                 // relock local lk
394                 lk.lock();
395                 // remove from waiting-queue
396                 waiting_producers_.remove( * active_ctx);
397                 return channel_op_status::timeout;
398             }
399             // resumed, slot maybe free
400         }
401     }
402 
pop(value_type & value)403     channel_op_status pop( value_type & value) {
404         context * active_ctx = context::active();
405         slot * s = nullptr;
406         for (;;) {
407             if ( nullptr != ( s = try_pop_() ) ) {
408                 {
409                     detail::spinlock_lock lk{ splk_producers_ };
410                     // notify one waiting producer
411                     while ( ! waiting_producers_.empty() ) {
412                         context * producer_ctx = & waiting_producers_.front();
413                         waiting_producers_.pop_front();
414                         auto expected = reinterpret_cast< std::intptr_t >( this);
415                         if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
416                             lk.unlock();
417                             // notify context
418                             active_ctx->schedule( producer_ctx);
419                             break;
420                         } if ( static_cast< std::intptr_t >( 0) == expected) {
421                             lk.unlock();
422                             // no timed-wait op.
423                             // notify context
424                             active_ctx->schedule( producer_ctx);
425                             break;
426                         }
427                     }
428                 }
429                 value = std::move( s->value);
430                 // notify context
431 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
432                 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
433 #else
434                 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
435 #endif
436                 return channel_op_status::success;
437             }
438             detail::spinlock_lock lk{ splk_consumers_ };
439             if ( BOOST_UNLIKELY( is_closed() ) ) {
440                 return channel_op_status::closed;
441             }
442             if ( ! is_empty_() ) {
443                 continue;
444             }
445             active_ctx->wait_link( waiting_consumers_);
446             active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
447             // suspend this consumer
448             active_ctx->suspend( lk);
449             // resumed, slot mabye set
450         }
451     }
452 
value_pop()453     value_type value_pop() {
454         context * active_ctx = context::active();
455         slot * s = nullptr;
456         for (;;) {
457             if ( nullptr != ( s = try_pop_() ) ) {
458                 {
459                     detail::spinlock_lock lk{ splk_producers_ };
460                     // notify one waiting producer
461                     while ( ! waiting_producers_.empty() ) {
462                         context * producer_ctx = & waiting_producers_.front();
463                         waiting_producers_.pop_front();
464                         auto expected = reinterpret_cast< std::intptr_t >( this);
465                         if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
466                             lk.unlock();
467                             // notify context
468                             active_ctx->schedule( producer_ctx);
469                             break;
470                         } if ( static_cast< std::intptr_t >( 0) == expected) {
471                             lk.unlock();
472                             // no timed-wait op.
473                             // notify context
474                             active_ctx->schedule( producer_ctx);
475                             break;
476                         }
477                     }
478                 }
479                 // consume value
480                 value_type value = std::move( s->value);
481                 // notify context
482 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
483                 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
484 #else
485                 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
486 #endif
487                 return std::move( value);
488             }
489             detail::spinlock_lock lk{ splk_consumers_ };
490             if ( BOOST_UNLIKELY( is_closed() ) ) {
491                 throw fiber_error{
492                         std::make_error_code( std::errc::operation_not_permitted),
493                         "boost fiber: channel is closed" };
494             }
495             if ( ! is_empty_() ) {
496                 continue;
497             }
498             active_ctx->wait_link( waiting_consumers_);
499             active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
500             // suspend this consumer
501             active_ctx->suspend( lk);
502             // resumed, slot mabye set
503         }
504     }
505 
506     template< typename Rep, typename Period >
pop_wait_for(value_type & value,std::chrono::duration<Rep,Period> const & timeout_duration)507     channel_op_status pop_wait_for( value_type & value,
508                                     std::chrono::duration< Rep, Period > const& timeout_duration) {
509         return pop_wait_until( value,
510                                std::chrono::steady_clock::now() + timeout_duration);
511     }
512 
513     template< typename Clock, typename Duration >
pop_wait_until(value_type & value,std::chrono::time_point<Clock,Duration> const & timeout_time_)514     channel_op_status pop_wait_until( value_type & value,
515                                       std::chrono::time_point< Clock, Duration > const& timeout_time_) {
516         context * active_ctx = context::active();
517         slot * s = nullptr;
518         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
519         for (;;) {
520             if ( nullptr != ( s = try_pop_() ) ) {
521                 {
522                     detail::spinlock_lock lk{ splk_producers_ };
523                     // notify one waiting producer
524                     while ( ! waiting_producers_.empty() ) {
525                         context * producer_ctx = & waiting_producers_.front();
526                         waiting_producers_.pop_front();
527                         auto expected = reinterpret_cast< std::intptr_t >( this);
528                         if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
529                             lk.unlock();
530                             // notify context
531                             active_ctx->schedule( producer_ctx);
532                             break;
533                         }
534                         if ( static_cast< std::intptr_t >( 0) == expected) {
535                             lk.unlock();
536                             // no timed-wait op.
537                             // notify context
538                             active_ctx->schedule( producer_ctx);
539                             break;
540                         }
541                     }
542                 }
543                 // consume value
544                 value = std::move( s->value);
545                 // notify context
546 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
547                 active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
548 #else
549                 active_ctx->schedule( std::exchange( s->ctx, nullptr) );
550 #endif
551                 return channel_op_status::success;
552             }
553             detail::spinlock_lock lk{ splk_consumers_ };
554             if ( BOOST_UNLIKELY( is_closed() ) ) {
555                 return channel_op_status::closed;
556             }
557             if ( ! is_empty_() ) {
558                 continue;
559             }
560             active_ctx->wait_link( waiting_consumers_);
561             active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
562             // suspend this consumer
563             if ( ! active_ctx->wait_until( timeout_time, lk) ) {
564                 // relock local lk
565                 lk.lock();
566                 // remove from waiting-queue
567                 waiting_consumers_.remove( * active_ctx);
568                 return channel_op_status::timeout;
569             }
570         }
571     }
572 
573     class iterator {
574     private:
575         typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type  storage_type;
576 
577         unbuffered_channel  *   chan_{ nullptr };
578         storage_type            storage_;
579 
increment_()580         void increment_() {
581             BOOST_ASSERT( nullptr != chan_);
582             try {
583                 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
584             } catch ( fiber_error const&) {
585                 chan_ = nullptr;
586             }
587         }
588 
589     public:
590         using iterator_category = std::input_iterator_tag;
591         using difference_type = std::ptrdiff_t;
592         using pointer = value_type *;
593         using reference = value_type &;
594 
595         using pointer_t = pointer;
596         using reference_t = reference;
597 
598         iterator() noexcept = default;
599 
iterator(unbuffered_channel<T> * chan)600         explicit iterator( unbuffered_channel< T > * chan) noexcept :
601             chan_{ chan } {
602             increment_();
603         }
604 
iterator(iterator const & other)605         iterator( iterator const& other) noexcept :
606             chan_{ other.chan_ } {
607         }
608 
operator =(iterator const & other)609         iterator & operator=( iterator const& other) noexcept {
610             if ( this == & other) return * this;
611             chan_ = other.chan_;
612             return * this;
613         }
614 
operator ==(iterator const & other) const615         bool operator==( iterator const& other) const noexcept {
616             return other.chan_ == chan_;
617         }
618 
operator !=(iterator const & other) const619         bool operator!=( iterator const& other) const noexcept {
620             return other.chan_ != chan_;
621         }
622 
operator ++()623         iterator & operator++() {
624             reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
625             increment_();
626             return * this;
627         }
628 
629         const iterator operator++( int) = delete;
630 
operator *()631         reference_t operator*() noexcept {
632             return * reinterpret_cast< value_type * >( std::addressof( storage_) );
633         }
634 
operator ->()635         pointer_t operator->() noexcept {
636             return reinterpret_cast< value_type * >( std::addressof( storage_) );
637         }
638     };
639 
640     friend class iterator;
641 };
642 
643 template< typename T >
644 typename unbuffered_channel< T >::iterator
begin(unbuffered_channel<T> & chan)645 begin( unbuffered_channel< T > & chan) {
646     return typename unbuffered_channel< T >::iterator( & chan);
647 }
648 
649 template< typename T >
650 typename unbuffered_channel< T >::iterator
end(unbuffered_channel<T> &)651 end( unbuffered_channel< T > &) {
652     return typename unbuffered_channel< T >::iterator();
653 }
654 
655 }}
656 
657 #ifdef BOOST_HAS_ABI_HEADERS
658 #  include BOOST_ABI_SUFFIX
659 #endif
660 
661 #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H
662