• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 //          Copyright Oliver Kowalke 2013.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6 
7 #include "boost/fiber/scheduler.hpp"
8 
9 #include <chrono>
10 #include <mutex>
11 
12 #include <boost/assert.hpp>
13 
14 #include "boost/fiber/algo/round_robin.hpp"
15 #include "boost/fiber/context.hpp"
16 #include "boost/fiber/exceptions.hpp"
17 
18 #ifdef BOOST_HAS_ABI_HEADERS
19 #  include BOOST_ABI_PREFIX
20 #endif
21 
22 namespace boost {
23 namespace fibers {
24 
25 void
release_terminated_()26 scheduler::release_terminated_() noexcept {
27     while ( ! terminated_queue_.empty() ) {
28         context * ctx = & terminated_queue_.front();
29         terminated_queue_.pop_front();
30         BOOST_ASSERT( ctx->is_context( type::worker_context) );
31         BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
32         BOOST_ASSERT( this == ctx->get_scheduler() );
33         BOOST_ASSERT( ctx->is_resumable() );
34         BOOST_ASSERT( ! ctx->worker_is_linked() );
35         BOOST_ASSERT( ! ctx->ready_is_linked() );
36 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
37         BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
38 #endif
39         BOOST_ASSERT( ! ctx->sleep_is_linked() );
40         BOOST_ASSERT( ! ctx->wait_is_linked() );
41         BOOST_ASSERT( ctx->wait_queue_.empty() );
42         BOOST_ASSERT( ctx->terminated_);
43         // if last reference, e.g. fiber::join() or fiber::detach()
44         // have been already called, this will call ~context(),
45         // the context is automatically removeid from worker-queue
46         intrusive_ptr_release( ctx);
47     }
48 }
49 
50 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
51 void
remote_ready2ready_()52 scheduler::remote_ready2ready_() noexcept {
53     remote_ready_queue_type tmp;
54     detail::spinlock_lock lk{ remote_ready_splk_ };
55     remote_ready_queue_.swap( tmp);
56     lk.unlock();
57     // get context from remote ready-queue
58     while ( ! tmp.empty() ) {
59         context * ctx = & tmp.front();
60         tmp.pop_front();
61         // ctx was signaled from remote (other thread)
62         // ctx might have been already resumed because of
63         // its wait-op. has been already timed out and
64         // thus it was already pushed to the ready-queue
65         if ( ! ctx->ready_is_linked() ) {
66             // store context in local queues
67             schedule( ctx);
68         }
69     }
70 }
71 #endif
72 
73 void
sleep2ready_()74 scheduler::sleep2ready_() noexcept {
75     // move context which the deadline has reached
76     // to ready-queue
77     // sleep-queue is sorted (ascending)
78     std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
79     sleep_queue_type::iterator e = sleep_queue_.end();
80     for ( sleep_queue_type::iterator i = sleep_queue_.begin(); i != e;) {
81         context * ctx = & ( * i);
82         // dipatcher context must never be pushed to sleep-queue
83         BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
84         BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() );
85         BOOST_ASSERT( ! ctx->ready_is_linked() );
86 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
87         BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
88 #endif
89         BOOST_ASSERT( ! ctx->terminated_is_linked() );
90         // set fiber to state_ready if deadline was reached
91         if ( ctx->tp_ <= now) {
92             // remove context from sleep-queue
93             i = sleep_queue_.erase( i);
94             // reset sleep-tp
95             ctx->tp_ = (std::chrono::steady_clock::time_point::max)();
96             std::intptr_t prev = ctx->twstatus.exchange( -2);
97             if ( static_cast< std::intptr_t >( -1) ==  prev) {
98                 // timed-wait op.: timeout after notify
99                 continue;
100             }
101             // prev == 0: no timed-wait op.
102             // prev == <any>: timed-wait op., timeout before notify
103             // store context in local queues
104             schedule( ctx);
105         } else {
106             break; // first context with now < deadline
107         }
108     }
109 }
110 
scheduler()111 scheduler::scheduler() noexcept :
112     algo_{ new algo::round_robin() } {
113 }
114 
~scheduler()115 scheduler::~scheduler() {
116     BOOST_ASSERT( nullptr != main_ctx_);
117     BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
118     BOOST_ASSERT( context::active() == main_ctx_);
119     // signal dispatcher-context termination
120     shutdown_ = true;
121     // resume pending fibers
122     // by joining dispatcher-context
123     dispatcher_ctx_->join();
124     // no context' in worker-queue
125     BOOST_ASSERT( worker_queue_.empty() );
126     BOOST_ASSERT( terminated_queue_.empty() );
127     BOOST_ASSERT( sleep_queue_.empty() );
128     // set active context to nullptr
129     context::reset_active();
130     // deallocate dispatcher-context
131     BOOST_ASSERT( ! dispatcher_ctx_->ready_is_linked() );
132     dispatcher_ctx_.reset();
133     // set main-context to nullptr
134     main_ctx_ = nullptr;
135 }
136 
137 boost::context::fiber
dispatch()138 scheduler::dispatch() noexcept {
139     BOOST_ASSERT( context::active() == dispatcher_ctx_);
140     for (;;) {
141         if ( shutdown_) {
142             // notify sched-algorithm about termination
143             algo_->notify();
144             if ( worker_queue_.empty() ) {
145                 break;
146             }
147         }
148         // release terminated context'
149         release_terminated_();
150 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
151         // get context' from remote ready-queue
152         remote_ready2ready_();
153 #endif
154         // get sleeping context'
155         // must be called after remote_ready2ready_()
156         sleep2ready_();
157         // get next ready context
158         context * ctx = algo_->pick_next();
159         if ( nullptr != ctx) {
160             BOOST_ASSERT( ctx->is_resumable() );
161             BOOST_ASSERT( ! ctx->ready_is_linked() );
162 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
163             BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
164 #endif
165             BOOST_ASSERT( ! ctx->sleep_is_linked() );
166             BOOST_ASSERT( ! ctx->terminated_is_linked() );
167             // no test for '! ctx->wait_is_linked()' because
168             // context is registered in wait-queue of sync. primitives
169             // via wait_for()/wait_until()
170             // push dispatcher-context to ready-queue
171             // so that ready-queue never becomes empty
172             ctx->resume( dispatcher_ctx_.get() );
173             BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
174         } else {
175             // no ready context, wait till signaled
176             // set deadline to highest value
177             std::chrono::steady_clock::time_point suspend_time =
178                     (std::chrono::steady_clock::time_point::max)();
179             // get lowest deadline from sleep-queue
180             sleep_queue_type::iterator i = sleep_queue_.begin();
181             if ( sleep_queue_.end() != i) {
182                 suspend_time = i->tp_;
183             }
184             // no ready context, wait till signaled
185             algo_->suspend_until( suspend_time);
186         }
187     }
188     // release termianted context'
189     release_terminated_();
190     // return to main-context
191     return main_ctx_->suspend_with_cc();
192 }
193 
194 void
schedule(context * ctx)195 scheduler::schedule( context * ctx) noexcept {
196     BOOST_ASSERT( nullptr != ctx);
197     BOOST_ASSERT( ! ctx->ready_is_linked() );
198 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
199     BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
200 #endif
201     BOOST_ASSERT( ! ctx->terminated_is_linked() );
202     // remove context ctx from sleep-queue
203     // (might happen if blocked in timed_mutex::try_lock_until())
204     if ( ctx->sleep_is_linked() ) {
205         // unlink it from sleep-queue
206         ctx->sleep_unlink();
207     }
208     // push new context to ready-queue
209     algo_->awakened( ctx);
210 }
211 
212 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
213 void
schedule_from_remote(context * ctx)214 scheduler::schedule_from_remote( context * ctx) noexcept {
215     BOOST_ASSERT( nullptr != ctx);
216     // another thread might signal the main-context of this thread
217     BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
218     BOOST_ASSERT( this == ctx->get_scheduler() );
219     BOOST_ASSERT( ! ctx->ready_is_linked() );
220     BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
221     BOOST_ASSERT( ! ctx->terminated_is_linked() );
222     BOOST_ASSERT( ! ctx->wait_is_linked() );
223     // protect for concurrent access
224     detail::spinlock_lock lk{ remote_ready_splk_ };
225     BOOST_ASSERT( ! shutdown_);
226     BOOST_ASSERT( nullptr != main_ctx_);
227     BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
228     // push new context to remote ready-queue
229     ctx->remote_ready_link( remote_ready_queue_);
230     lk.unlock();
231     // notify scheduler
232     algo_->notify();
233 }
234 #endif
235 
236 boost::context::fiber
terminate(detail::spinlock_lock & lk,context * ctx)237 scheduler::terminate( detail::spinlock_lock & lk, context * ctx) noexcept {
238     BOOST_ASSERT( nullptr != ctx);
239     BOOST_ASSERT( context::active() == ctx);
240     BOOST_ASSERT( this == ctx->get_scheduler() );
241     BOOST_ASSERT( ctx->is_context( type::worker_context) );
242     BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
243     BOOST_ASSERT( ! ctx->ready_is_linked() );
244 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
245     BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
246 #endif
247     BOOST_ASSERT( ! ctx->sleep_is_linked() );
248     BOOST_ASSERT( ! ctx->terminated_is_linked() );
249     BOOST_ASSERT( ! ctx->wait_is_linked() );
250     BOOST_ASSERT( ctx->wait_queue_.empty() );
251     // store the terminated fiber in the terminated-queue
252     // the dispatcher-context will call
253     ctx->terminated_link( terminated_queue_);
254     // remove from the worker-queue
255     ctx->worker_unlink();
256     // release lock
257     lk.unlock();
258     // resume another fiber
259     return algo_->pick_next()->suspend_with_cc();
260 }
261 
262 void
yield(context * ctx)263 scheduler::yield( context * ctx) noexcept {
264     BOOST_ASSERT( nullptr != ctx);
265     BOOST_ASSERT( context::active() == ctx);
266     BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
267     BOOST_ASSERT( ! ctx->ready_is_linked() );
268 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
269     BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
270 #endif
271     BOOST_ASSERT( ! ctx->sleep_is_linked() );
272     BOOST_ASSERT( ! ctx->terminated_is_linked() );
273     BOOST_ASSERT( ! ctx->wait_is_linked() );
274     // resume another fiber
275     algo_->pick_next()->resume( ctx);
276 }
277 
278 bool
wait_until(context * ctx,std::chrono::steady_clock::time_point const & sleep_tp)279 scheduler::wait_until( context * ctx,
280                        std::chrono::steady_clock::time_point const& sleep_tp) noexcept {
281     BOOST_ASSERT( nullptr != ctx);
282     BOOST_ASSERT( context::active() == ctx);
283     BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
284     BOOST_ASSERT( ! ctx->ready_is_linked() );
285 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
286     BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
287 #endif
288     BOOST_ASSERT( ! ctx->sleep_is_linked() );
289     BOOST_ASSERT( ! ctx->terminated_is_linked() );
290     BOOST_ASSERT( ! ctx->wait_is_linked() );
291     ctx->tp_ = sleep_tp;
292     ctx->sleep_link( sleep_queue_);
293     // resume another context
294     algo_->pick_next()->resume();
295     // context has been resumed
296     // check if deadline has reached
297     return std::chrono::steady_clock::now() < sleep_tp;
298 }
299 
300 bool
wait_until(context * ctx,std::chrono::steady_clock::time_point const & sleep_tp,detail::spinlock_lock & lk)301 scheduler::wait_until( context * ctx,
302                        std::chrono::steady_clock::time_point const& sleep_tp,
303                        detail::spinlock_lock & lk) noexcept {
304     BOOST_ASSERT( nullptr != ctx);
305     BOOST_ASSERT( context::active() == ctx);
306     BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
307     BOOST_ASSERT( ! ctx->ready_is_linked() );
308 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
309     BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
310 #endif
311     BOOST_ASSERT( ! ctx->sleep_is_linked() );
312     BOOST_ASSERT( ! ctx->terminated_is_linked() );
313     // ctx->wait_is_linked() might return true
314     // if context was locked inside timed_mutex::try_lock_until()
315     // push active context to sleep-queue
316     ctx->tp_ = sleep_tp;
317     ctx->sleep_link( sleep_queue_);
318     // resume another context
319     algo_->pick_next()->resume( lk);
320     // context has been resumed
321     // check if deadline has reached
322     return std::chrono::steady_clock::now() < sleep_tp;
323 }
324 
325 void
suspend()326 scheduler::suspend() noexcept {
327     // resume another context
328     algo_->pick_next()->resume();
329 }
330 
331 void
suspend(detail::spinlock_lock & lk)332 scheduler::suspend( detail::spinlock_lock & lk) noexcept {
333     // resume another context
334     algo_->pick_next()->resume( lk);
335 }
336 
337 bool
has_ready_fibers() const338 scheduler::has_ready_fibers() const noexcept {
339     return algo_->has_ready_fibers();
340 }
341 
342 void
set_algo(algo::algorithm::ptr_t algo)343 scheduler::set_algo( algo::algorithm::ptr_t algo) noexcept {
344     // move remaining cotnext in current scheduler to new one
345     while ( algo_->has_ready_fibers() ) {
346         algo->awakened( algo_->pick_next() );
347     }
348     algo_ = std::move( algo);
349 }
350 
351 void
attach_main_context(context * ctx)352 scheduler::attach_main_context( context * ctx) noexcept {
353     BOOST_ASSERT( nullptr != ctx);
354     // main-context represents the execution context created
355     // by the system, e.g. main()- or thread-context
356     // should not be in worker-queue
357     main_ctx_ = ctx;
358     main_ctx_->scheduler_ = this;
359 }
360 
361 void
attach_dispatcher_context(intrusive_ptr<context> ctx)362 scheduler::attach_dispatcher_context( intrusive_ptr< context > ctx) noexcept {
363     BOOST_ASSERT( ctx);
364     // dispatcher context has to handle
365     //    - remote ready context'
366     //    - sleeping context'
367     //    - extern event-loops
368     //    - suspending the thread if ready-queue is empty (waiting on external event)
369     // should not be in worker-queue
370     dispatcher_ctx_.swap( ctx);
371     // add dispatcher-context to ready-queue
372     // so it is the first element in the ready-queue
373     // if the main context tries to suspend the first time
374     // the dispatcher-context is resumed and
375     // scheduler::dispatch() is executed
376     dispatcher_ctx_->scheduler_ = this;
377     algo_->awakened( dispatcher_ctx_.get() );
378 }
379 
380 void
attach_worker_context(context * ctx)381 scheduler::attach_worker_context( context * ctx) noexcept {
382     BOOST_ASSERT( nullptr != ctx);
383     BOOST_ASSERT( nullptr == ctx->get_scheduler() );
384     BOOST_ASSERT( ! ctx->ready_is_linked() );
385 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
386     BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
387 #endif
388     BOOST_ASSERT( ! ctx->sleep_is_linked() );
389     BOOST_ASSERT( ! ctx->terminated_is_linked() );
390     BOOST_ASSERT( ! ctx->wait_is_linked() );
391     BOOST_ASSERT( ! ctx->worker_is_linked() );
392     ctx->worker_link( worker_queue_);
393     ctx->scheduler_ = this;
394     // an attached context must belong at least to worker-queue
395 }
396 
397 void
detach_worker_context(context * ctx)398 scheduler::detach_worker_context( context * ctx) noexcept {
399     BOOST_ASSERT( nullptr != ctx);
400     BOOST_ASSERT( ! ctx->ready_is_linked() );
401 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
402     BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
403 #endif
404     BOOST_ASSERT( ! ctx->sleep_is_linked() );
405     BOOST_ASSERT( ! ctx->terminated_is_linked() );
406     BOOST_ASSERT( ! ctx->wait_is_linked() );
407     BOOST_ASSERT( ctx->worker_is_linked() );
408     BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
409     ctx->worker_unlink();
410     BOOST_ASSERT( ! ctx->worker_is_linked() );
411     ctx->scheduler_ = nullptr;
412     // a detached context must not belong to any queue
413 }
414 
415 }}
416 
417 #ifdef BOOST_HAS_ABI_HEADERS
418 #  include BOOST_ABI_SUFFIX
419 #endif
420