1
2 // Copyright Oliver Kowalke 2015.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #include "boost/fiber/algo/work_stealing.hpp"
9
10 #include <random>
11
12 #include <boost/assert.hpp>
13 #include <boost/context/detail/prefetch.hpp>
14
15 #include "boost/fiber/detail/thread_barrier.hpp"
16 #include "boost/fiber/type.hpp"
17
18 #ifdef BOOST_HAS_ABI_HEADERS
19 # include BOOST_ABI_PREFIX
20 #endif
21
22 namespace boost {
23 namespace fibers {
24 namespace algo {
25
26 std::atomic< std::uint32_t > work_stealing::counter_{ 0 };
27 std::vector< intrusive_ptr< work_stealing > > work_stealing::schedulers_{};
28
29 void
init_(std::uint32_t thread_count,std::vector<intrusive_ptr<work_stealing>> & schedulers)30 work_stealing::init_( std::uint32_t thread_count,
31 std::vector< intrusive_ptr< work_stealing > > & schedulers) {
32 // resize array of schedulers to thread_count, initilized with nullptr
33 std::vector< intrusive_ptr< work_stealing > >{ thread_count, nullptr }.swap( schedulers);
34 }
35
work_stealing(std::uint32_t thread_count,bool suspend)36 work_stealing::work_stealing( std::uint32_t thread_count, bool suspend) :
37 id_{ counter_++ },
38 thread_count_{ thread_count },
39 suspend_{ suspend } {
40 static boost::fibers::detail::thread_barrier b{ thread_count };
41 // initialize the array of schedulers
42 static std::once_flag flag;
43 std::call_once( flag, & work_stealing::init_, thread_count_, std::ref( schedulers_) );
44 // register pointer of this scheduler
45 schedulers_[id_] = this;
46 b.wait();
47 }
48
49 void
awakened(context * ctx)50 work_stealing::awakened( context * ctx) noexcept {
51 if ( ! ctx->is_context( type::pinned_context) ) {
52 ctx->detach();
53 }
54 rqueue_.push( ctx);
55 }
56
57 context *
pick_next()58 work_stealing::pick_next() noexcept {
59 context * victim = rqueue_.pop();
60 if ( nullptr != victim) {
61 boost::context::detail::prefetch_range( victim, sizeof( context) );
62 if ( ! victim->is_context( type::pinned_context) ) {
63 context::active()->attach( victim);
64 }
65 } else {
66 std::uint32_t id = 0;
67 std::size_t count = 0, size = schedulers_.size();
68 static thread_local std::minstd_rand generator{ std::random_device{}() };
69 std::uniform_int_distribution< std::uint32_t > distribution{
70 0, static_cast< std::uint32_t >( thread_count_ - 1) };
71 do {
72 do {
73 ++count;
74 // random selection of one logical cpu
75 // that belongs to the local NUMA node
76 id = distribution( generator);
77 // prevent stealing from own scheduler
78 } while ( id == id_);
79 // steal context from other scheduler
80 victim = schedulers_[id]->steal();
81 } while ( nullptr == victim && count < size);
82 if ( nullptr != victim) {
83 boost::context::detail::prefetch_range( victim, sizeof( context) );
84 BOOST_ASSERT( ! victim->is_context( type::pinned_context) );
85 context::active()->attach( victim);
86 }
87 }
88 return victim;
89 }
90
91 void
suspend_until(std::chrono::steady_clock::time_point const & time_point)92 work_stealing::suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
93 if ( suspend_) {
94 if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
95 std::unique_lock< std::mutex > lk{ mtx_ };
96 cnd_.wait( lk, [this](){ return flag_; });
97 flag_ = false;
98 } else {
99 std::unique_lock< std::mutex > lk{ mtx_ };
100 cnd_.wait_until( lk, time_point, [this](){ return flag_; });
101 flag_ = false;
102 }
103 }
104 }
105
106 void
notify()107 work_stealing::notify() noexcept {
108 if ( suspend_) {
109 std::unique_lock< std::mutex > lk{ mtx_ };
110 flag_ = true;
111 lk.unlock();
112 cnd_.notify_all();
113 }
114 }
115
116 }}}
117
118 #ifdef BOOST_HAS_ABI_HEADERS
119 # include BOOST_ABI_SUFFIX
120 #endif
121