• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //          Copyright Oliver Kowalke 2013.
2 // Distributed under the Boost Software License, Version 1.0.
3 //    (See accompanying file LICENSE_1_0.txt or copy at
4 //          http://www.boost.org/LICENSE_1_0.txt)
5 
6 #ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H
7 #define BOOST_FIBERS_ASIO_ROUND_ROBIN_H
8 
9 #include <chrono>
10 #include <cstddef>
11 #include <memory>
12 #include <mutex>
13 #include <queue>
14 
15 #include <boost/asio.hpp>
16 #include <boost/assert.hpp>
17 #include <boost/asio/steady_timer.hpp>
18 #include <boost/config.hpp>
19 
20 #include <boost/fiber/condition_variable.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/mutex.hpp>
23 #include <boost/fiber/operations.hpp>
24 #include <boost/fiber/scheduler.hpp>
25 
26 #include "yield.hpp"
27 
28 #ifdef BOOST_HAS_ABI_HEADERS
29 #  include BOOST_ABI_PREFIX
30 #endif
31 
32 namespace boost {
33 namespace fibers {
34 namespace asio {
35 
36 class round_robin : public algo::algorithm {
37 private:
38 //[asio_rr_suspend_timer
39     std::shared_ptr< boost::asio::io_context >      io_ctx_;
40     boost::asio::steady_timer                       suspend_timer_;
41 //]
42     boost::fibers::scheduler::ready_queue_type      rqueue_{};
43     boost::fibers::mutex                            mtx_{};
44     boost::fibers::condition_variable               cnd_{};
45     std::size_t                                     counter_{ 0 };
46 
47 public:
48 //[asio_rr_service_top
49     struct service : public boost::asio::io_context::service {
50         static boost::asio::io_context::id                  id;
51 
52         std::unique_ptr< boost::asio::io_context::work >    work_;
53 
serviceboost::fibers::asio::round_robin::service54         service( boost::asio::io_context & io_ctx) :
55             boost::asio::io_context::service( io_ctx),
56             work_{ new boost::asio::io_context::work( io_ctx) } {
57         }
58 
~serviceboost::fibers::asio::round_robin::service59         virtual ~service() {}
60 
61         service( service const&) = delete;
62         service & operator=( service const&) = delete;
63 
shutdown_serviceboost::fibers::asio::round_robin::service64         void shutdown_service() override final {
65             work_.reset();
66         }
67     };
68 //]
69 
70 //[asio_rr_ctor
round_robin(std::shared_ptr<boost::asio::io_context> const & io_ctx_)71     round_robin( std::shared_ptr< boost::asio::io_context > const& io_ctx_) :
72         io_ctx_( io_ctx),
73         suspend_timer_( * io_ctx_) {
74         // We use add_service() very deliberately. This will throw
75         // service_already_exists if you pass the same io_context instance to
76         // more than one round_robin instance.
77         boost::asio::add_service( * io_ctx_, new service( * io_ctx_) );
78         boost::asio::post( * io_ctx_, [this]() mutable {
79 //]
80 //[asio_rr_service_lambda
81                 while ( ! io_ctx_->stopped() ) {
82                     if ( has_ready_fibers() ) {
83                         // run all pending handlers in round_robin
84                         while ( io_ctx_->poll() );
85                         // block this fiber till all pending (ready) fibers are processed
86                         // == round_robin::suspend_until() has been called
87                         std::unique_lock< boost::fibers::mutex > lk( mtx_);
88                         cnd_.wait( lk);
89                     } else {
90                         // run one handler inside io_context
91                         // if no handler available, block this thread
92                         if ( ! io_ctx_->run_one() ) {
93                             break;
94                         }
95                     }
96                }
97 //]
98             });
99     }
100 
awakened(context * ctx)101     void awakened( context * ctx) noexcept {
102         BOOST_ASSERT( nullptr != ctx);
103         BOOST_ASSERT( ! ctx->ready_is_linked() );
104         ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/
105         if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
106             ++counter_;
107         }
108     }
109 
pick_next()110     context * pick_next() noexcept {
111         context * ctx( nullptr);
112         if ( ! rqueue_.empty() ) { /*<
113             pop an item from the ready queue
114         >*/
115             ctx = & rqueue_.front();
116             rqueue_.pop_front();
117             BOOST_ASSERT( nullptr != ctx);
118             BOOST_ASSERT( context::active() != ctx);
119             if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
120                 --counter_;
121             }
122         }
123         return ctx;
124     }
125 
has_ready_fibers() const126     bool has_ready_fibers() const noexcept {
127         return 0 < counter_;
128     }
129 
130 //[asio_rr_suspend_until
suspend_until(std::chrono::steady_clock::time_point const & abs_time)131     void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
132         // Set a timer so at least one handler will eventually fire, causing
133         // run_one() to eventually return.
134         if ( (std::chrono::steady_clock::time_point::max)() != abs_time) {
135 			// Each expires_at(time_point) call cancels any previous pending
136 			// call. We could inadvertently spin like this:
137 			// dispatcher calls suspend_until() with earliest wake time
138 			// suspend_until() sets suspend_timer_
139 			// lambda loop calls run_one()
140 			// some other asio handler runs before timer expires
141 			// run_one() returns to lambda loop
142 			// lambda loop yields to dispatcher
143 			// dispatcher finds no ready fibers
144 			// dispatcher calls suspend_until() with SAME wake time
145 			// suspend_until() sets suspend_timer_ to same time, canceling
146 			// previous async_wait()
147 			// lambda loop calls run_one()
148 			// asio calls suspend_timer_ handler with operation_aborted
149 			// run_one() returns to lambda loop... etc. etc.
150 			// So only actually set the timer when we're passed a DIFFERENT
151 			// abs_time value.
152             suspend_timer_.expires_at( abs_time);
153             suspend_timer_.async_wait([](boost::system::error_code const&){
154                                         this_fiber::yield();
155                                       });
156         }
157         cnd_.notify_one();
158     }
159 //]
160 
161 //[asio_rr_notify
notify()162     void notify() noexcept {
163         // Something has happened that should wake one or more fibers BEFORE
164         // suspend_timer_ expires. Reset the timer to cause it to fire
165         // immediately, causing the run_one() call to return. In theory we
166         // could use cancel() because we don't care whether suspend_timer_'s
167         // handler is called with operation_aborted or success. However --
168         // cancel() doesn't change the expiration time, and we use
169         // suspend_timer_'s expiration time to decide whether it's already
170         // set. If suspend_until() set some specific wake time, then notify()
171         // canceled it, then suspend_until() was called again with the same
172         // wake time, it would match suspend_timer_'s expiration time and we'd
173         // refrain from setting the timer. So instead of simply calling
174         // cancel(), reset the timer, which cancels the pending sleep AND sets
175         // a new expiration time. This will cause us to spin the loop twice --
176         // once for the operation_aborted handler, once for timer expiration
177         // -- but that shouldn't be a big problem.
178         suspend_timer_.async_wait([](boost::system::error_code const&){
179                                     this_fiber::yield();
180                                   });
181         suspend_timer_.expires_at( std::chrono::steady_clock::now() );
182     }
183 //]
184 };
185 
186 boost::asio::io_context::id round_robin::service::id;
187 
188 }}}
189 
190 #ifdef BOOST_HAS_ABI_HEADERS
191 #  include BOOST_ABI_SUFFIX
192 #endif
193 
194 #endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H
195