1 // Copyright Oliver Kowalke 2013. 2 // Distributed under the Boost Software License, Version 1.0. 3 // (See accompanying file LICENSE_1_0.txt or copy at 4 // http://www.boost.org/LICENSE_1_0.txt) 5 6 #ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H 7 #define BOOST_FIBERS_ASIO_ROUND_ROBIN_H 8 9 #include <chrono> 10 #include <cstddef> 11 #include <memory> 12 #include <mutex> 13 #include <queue> 14 15 #include <boost/asio.hpp> 16 #include <boost/assert.hpp> 17 #include <boost/asio/steady_timer.hpp> 18 #include <boost/config.hpp> 19 20 #include <boost/fiber/condition_variable.hpp> 21 #include <boost/fiber/context.hpp> 22 #include <boost/fiber/mutex.hpp> 23 #include <boost/fiber/operations.hpp> 24 #include <boost/fiber/scheduler.hpp> 25 26 #include "yield.hpp" 27 28 #ifdef BOOST_HAS_ABI_HEADERS 29 # include BOOST_ABI_PREFIX 30 #endif 31 32 namespace boost { 33 namespace fibers { 34 namespace asio { 35 36 class round_robin : public algo::algorithm { 37 private: 38 //[asio_rr_suspend_timer 39 std::shared_ptr< boost::asio::io_context > io_ctx_; 40 boost::asio::steady_timer suspend_timer_; 41 //] 42 boost::fibers::scheduler::ready_queue_type rqueue_{}; 43 boost::fibers::mutex mtx_{}; 44 boost::fibers::condition_variable cnd_{}; 45 std::size_t counter_{ 0 }; 46 47 public: 48 //[asio_rr_service_top 49 struct service : public boost::asio::io_context::service { 50 static boost::asio::io_context::id id; 51 52 std::unique_ptr< boost::asio::io_context::work > work_; 53 serviceboost::fibers::asio::round_robin::service54 service( boost::asio::io_context & io_ctx) : 55 boost::asio::io_context::service( io_ctx), 56 work_{ new boost::asio::io_context::work( io_ctx) } { 57 } 58 ~serviceboost::fibers::asio::round_robin::service59 virtual ~service() {} 60 61 service( service const&) = delete; 62 service & operator=( service const&) = delete; 63 shutdown_serviceboost::fibers::asio::round_robin::service64 void shutdown_service() override final { 65 work_.reset(); 66 } 67 }; 68 //] 69 70 //[asio_rr_ctor round_robin(std::shared_ptr<boost::asio::io_context> const & io_ctx_)71 round_robin( std::shared_ptr< boost::asio::io_context > const& io_ctx_) : 72 io_ctx_( io_ctx), 73 suspend_timer_( * io_ctx_) { 74 // We use add_service() very deliberately. This will throw 75 // service_already_exists if you pass the same io_context instance to 76 // more than one round_robin instance. 77 boost::asio::add_service( * io_ctx_, new service( * io_ctx_) ); 78 boost::asio::post( * io_ctx_, [this]() mutable { 79 //] 80 //[asio_rr_service_lambda 81 while ( ! io_ctx_->stopped() ) { 82 if ( has_ready_fibers() ) { 83 // run all pending handlers in round_robin 84 while ( io_ctx_->poll() ); 85 // block this fiber till all pending (ready) fibers are processed 86 // == round_robin::suspend_until() has been called 87 std::unique_lock< boost::fibers::mutex > lk( mtx_); 88 cnd_.wait( lk); 89 } else { 90 // run one handler inside io_context 91 // if no handler available, block this thread 92 if ( ! io_ctx_->run_one() ) { 93 break; 94 } 95 } 96 } 97 //] 98 }); 99 } 100 awakened(context * ctx)101 void awakened( context * ctx) noexcept { 102 BOOST_ASSERT( nullptr != ctx); 103 BOOST_ASSERT( ! ctx->ready_is_linked() ); 104 ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/ 105 if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { 106 ++counter_; 107 } 108 } 109 pick_next()110 context * pick_next() noexcept { 111 context * ctx( nullptr); 112 if ( ! rqueue_.empty() ) { /*< 113 pop an item from the ready queue 114 >*/ 115 ctx = & rqueue_.front(); 116 rqueue_.pop_front(); 117 BOOST_ASSERT( nullptr != ctx); 118 BOOST_ASSERT( context::active() != ctx); 119 if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { 120 --counter_; 121 } 122 } 123 return ctx; 124 } 125 has_ready_fibers() const126 bool has_ready_fibers() const noexcept { 127 return 0 < counter_; 128 } 129 130 //[asio_rr_suspend_until suspend_until(std::chrono::steady_clock::time_point const & abs_time)131 void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept { 132 // Set a timer so at least one handler will eventually fire, causing 133 // run_one() to eventually return. 134 if ( (std::chrono::steady_clock::time_point::max)() != abs_time) { 135 // Each expires_at(time_point) call cancels any previous pending 136 // call. We could inadvertently spin like this: 137 // dispatcher calls suspend_until() with earliest wake time 138 // suspend_until() sets suspend_timer_ 139 // lambda loop calls run_one() 140 // some other asio handler runs before timer expires 141 // run_one() returns to lambda loop 142 // lambda loop yields to dispatcher 143 // dispatcher finds no ready fibers 144 // dispatcher calls suspend_until() with SAME wake time 145 // suspend_until() sets suspend_timer_ to same time, canceling 146 // previous async_wait() 147 // lambda loop calls run_one() 148 // asio calls suspend_timer_ handler with operation_aborted 149 // run_one() returns to lambda loop... etc. etc. 150 // So only actually set the timer when we're passed a DIFFERENT 151 // abs_time value. 152 suspend_timer_.expires_at( abs_time); 153 suspend_timer_.async_wait([](boost::system::error_code const&){ 154 this_fiber::yield(); 155 }); 156 } 157 cnd_.notify_one(); 158 } 159 //] 160 161 //[asio_rr_notify notify()162 void notify() noexcept { 163 // Something has happened that should wake one or more fibers BEFORE 164 // suspend_timer_ expires. Reset the timer to cause it to fire 165 // immediately, causing the run_one() call to return. In theory we 166 // could use cancel() because we don't care whether suspend_timer_'s 167 // handler is called with operation_aborted or success. However -- 168 // cancel() doesn't change the expiration time, and we use 169 // suspend_timer_'s expiration time to decide whether it's already 170 // set. If suspend_until() set some specific wake time, then notify() 171 // canceled it, then suspend_until() was called again with the same 172 // wake time, it would match suspend_timer_'s expiration time and we'd 173 // refrain from setting the timer. So instead of simply calling 174 // cancel(), reset the timer, which cancels the pending sleep AND sets 175 // a new expiration time. This will cause us to spin the loop twice -- 176 // once for the operation_aborted handler, once for timer expiration 177 // -- but that shouldn't be a big problem. 178 suspend_timer_.async_wait([](boost::system::error_code const&){ 179 this_fiber::yield(); 180 }); 181 suspend_timer_.expires_at( std::chrono::steady_clock::now() ); 182 } 183 //] 184 }; 185 186 boost::asio::io_context::id round_robin::service::id; 187 188 }}} 189 190 #ifdef BOOST_HAS_ABI_HEADERS 191 # include BOOST_ABI_SUFFIX 192 #endif 193 194 #endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H 195