• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //          Copyright Nat Goodspeed 2014.
2 // Distributed under the Boost Software License, Version 1.0.
3 //    (See accompanying file LICENSE_1_0.txt or copy at
4 //          http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <chrono>
7 #include <condition_variable>
8 #include <iostream>
9 #include <mutex>
10 #include <algorithm>                // std::find_if()
11 
12 #include <boost/fiber/all.hpp>
13 #include <boost/fiber/scheduler.hpp>
14 
15 class Verbose {
16 public:
Verbose(std::string const & d,std::string const & s="stop")17     Verbose( std::string const& d, std::string const& s="stop") :
18         desc( d),
19         stop( s) {
20         std::cout << desc << " start" << std::endl;
21     }
22 
~Verbose()23     ~Verbose() {
24         std::cout << desc << ' ' << stop << std::endl;
25     }
26 
27     Verbose( Verbose const&) = delete;
28     Verbose & operator=( Verbose const&) = delete;
29 
30 private:
31     std::string     desc;
32     std::string     stop;
33 };
34 
35 //[priority_props
36 class priority_props : public boost::fibers::fiber_properties {
37 public:
priority_props(boost::fibers::context * ctx)38     priority_props( boost::fibers::context * ctx):
39         fiber_properties( ctx), /*< Your subclass constructor must accept a
40                                  [^[class_link context]*] and pass it to
41                                  the `fiber_properties` constructor. >*/
42         priority_( 0) {
43     }
44 
get_priority() const45     int get_priority() const {
46         return priority_; /*< Provide read access methods at your own discretion. >*/
47     }
48 
49     // Call this method to alter priority, because we must notify
50     // priority_scheduler of any change.
set_priority(int p)51     void set_priority( int p) { /*<
52             It's important to call `notify()` on any
53             change in a property that can affect the
54             scheduler's behavior. Therefore, such
55             modifications should only be performed
56             through an access method. >*/
57         // Of course, it's only worth reshuffling the queue and all if we're
58         // actually changing the priority.
59         if ( p != priority_) {
60             priority_ = p;
61             notify();
62         }
63     }
64 
65     // The fiber name of course is solely for purposes of this example
66     // program; it has nothing to do with implementing scheduler priority.
67     // This is a public data member -- not requiring set/get access methods --
68     // because we need not inform the scheduler of any change.
69     std::string name; /*< A property that does not affect the scheduler does
70                           not need access methods. >*/
71 private:
72     int priority_;
73 };
74 //]
75 
76 //[priority_scheduler
77 class priority_scheduler :
78     public boost::fibers::algo::algorithm_with_properties< priority_props > {
79 private:
80     typedef boost::fibers::scheduler::ready_queue_type/*< See [link ready_queue_t]. >*/   rqueue_t;
81 
82     rqueue_t                                rqueue_;
83     std::mutex                  mtx_{};
84     std::condition_variable     cnd_{};
85     bool                        flag_{ false };
86 
87 public:
priority_scheduler()88     priority_scheduler() :
89         rqueue_() {
90     }
91 
92     // For a subclass of algorithm_with_properties<>, it's important to
93     // override the correct awakened() overload.
94     /*<< You must override the [member_link algorithm_with_properties..awakened]
95          method. This is how your scheduler receives notification of a
96          fiber that has become ready to run. >>*/
awakened(boost::fibers::context * ctx,priority_props & props)97     virtual void awakened( boost::fibers::context * ctx, priority_props & props) noexcept {
98         int ctx_priority = props.get_priority(); /*< `props` is the instance of
99                                                    priority_props associated
100                                                    with the passed fiber `ctx`. >*/
101         // With this scheduler, fibers with higher priority values are
102         // preferred over fibers with lower priority values. But fibers with
103         // equal priority values are processed in round-robin fashion. So when
104         // we're handed a new context*, put it at the end of the fibers
105         // with that same priority. In other words: search for the first fiber
106         // in the queue with LOWER priority, and insert before that one.
107         rqueue_t::iterator i( std::find_if( rqueue_.begin(), rqueue_.end(),
108             [ctx_priority,this]( boost::fibers::context & c)
109             { return properties( &c ).get_priority() < ctx_priority; }));
110         // Now, whether or not we found a fiber with lower priority,
111         // insert this new fiber here.
112         rqueue_.insert( i, * ctx);
113 //<-
114 
115         std::cout << "awakened(" << props.name << "): ";
116         describe_ready_queue();
117 //->
118     }
119 
120     /*<< You must override the [member_link algorithm_with_properties..pick_next]
121          method. This is how your scheduler actually advises the fiber manager
122          of the next fiber to run. >>*/
pick_next()123     virtual boost::fibers::context * pick_next() noexcept {
124         // if ready queue is empty, just tell caller
125         if ( rqueue_.empty() ) {
126             return nullptr;
127         }
128         boost::fibers::context * ctx( & rqueue_.front() );
129         rqueue_.pop_front();
130 //<-
131         std::cout << "pick_next() resuming " << properties( ctx).name << ": ";
132         describe_ready_queue();
133 //->
134         return ctx;
135     }
136 
137     /*<< You must override [member_link algorithm_with_properties..has_ready_fibers]
138       to inform the fiber manager of the state of your ready queue. >>*/
has_ready_fibers() const139     virtual bool has_ready_fibers() const noexcept {
140         return ! rqueue_.empty();
141     }
142 
143     /*<< Overriding [member_link algorithm_with_properties..property_change]
144          is optional. This override handles the case in which the running
145          fiber changes the priority of another ready fiber: a fiber already in
146          our queue. In that case, move the updated fiber within the queue. >>*/
property_change(boost::fibers::context * ctx,priority_props & props)147     virtual void property_change( boost::fibers::context * ctx, priority_props & props) noexcept {
148         // Although our priority_props class defines multiple properties, only
149         // one of them (priority) actually calls notify() when changed. The
150         // point of a property_change() override is to reshuffle the ready
151         // queue according to the updated priority value.
152 //<-
153         std::cout << "property_change(" << props.name << '(' << props.get_priority()
154                   << ")): ";
155 //->
156 
157         // 'ctx' might not be in our queue at all, if caller is changing the
158         // priority of (say) the running fiber. If it's not there, no need to
159         // move it: we'll handle it next time it hits awakened().
160         if ( ! ctx->ready_is_linked()) { /*<
161             Your `property_change()` override must be able to
162             handle the case in which the passed `ctx` is not in
163             your ready queue. It might be running, or it might be
164             blocked. >*/
165 //<-
166             // hopefully user will distinguish this case by noticing that
167             // the fiber with which we were called does not appear in the
168             // ready queue at all
169             describe_ready_queue();
170 //->
171             return;
172         }
173 
174         // Found ctx: unlink it
175         ctx->ready_unlink();
176 
177         // Here we know that ctx was in our ready queue, but we've unlinked
178         // it. We happen to have a method that will (re-)add a context* to the
179         // right place in the ready queue.
180         awakened( ctx, props);
181     }
182 //<-
183 
describe_ready_queue()184     void describe_ready_queue() {
185         if ( rqueue_.empty() ) {
186             std::cout << "[empty]";
187         } else {
188             const char * delim = "";
189             for ( boost::fibers::context & ctx : rqueue_) {
190                 priority_props & props( properties( & ctx) );
191                 std::cout << delim << props.name << '(' << props.get_priority() << ')';
192                 delim = ", ";
193             }
194         }
195         std::cout << std::endl;
196     }
197 //->
198 
suspend_until(std::chrono::steady_clock::time_point const & time_point)199     void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
200         if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
201             std::unique_lock< std::mutex > lk( mtx_);
202             cnd_.wait( lk, [this](){ return flag_; });
203             flag_ = false;
204         } else {
205             std::unique_lock< std::mutex > lk( mtx_);
206             cnd_.wait_until( lk, time_point, [this](){ return flag_; });
207             flag_ = false;
208         }
209     }
210 
notify()211     void notify() noexcept {
212         std::unique_lock< std::mutex > lk( mtx_);
213         flag_ = true;
214         lk.unlock();
215         cnd_.notify_all();
216     }
217 };
218 //]
219 
220 //[launch
221 template< typename Fn >
launch(Fn && func,std::string const & name,int priority)222 boost::fibers::fiber launch( Fn && func, std::string const& name, int priority) {
223     boost::fibers::fiber fiber( func);
224     priority_props & props( fiber.properties< priority_props >() );
225     props.name = name;
226     props.set_priority( priority);
227     return fiber;
228 }
229 //]
230 
yield_fn()231 void yield_fn() {
232     std::string name( boost::this_fiber::properties< priority_props >().name);
233     Verbose v( std::string("fiber ") + name);
234     for ( int i = 0; i < 3; ++i) {
235         std::cout << "fiber " << name << " yielding" << std::endl;
236         boost::this_fiber::yield();
237     }
238 }
239 
barrier_fn(boost::fibers::barrier & barrier)240 void barrier_fn( boost::fibers::barrier & barrier) {
241     std::string name( boost::this_fiber::properties< priority_props >().name);
242     Verbose v( std::string("fiber ") + name);
243     std::cout << "fiber " << name << " waiting on barrier" << std::endl;
244     barrier.wait();
245     std::cout << "fiber " << name << " yielding" << std::endl;
246     boost::this_fiber::yield();
247 }
248 
249 //[change_fn
change_fn(boost::fibers::fiber & other,int other_priority,boost::fibers::barrier & barrier)250 void change_fn( boost::fibers::fiber & other,
251                 int other_priority,
252                 boost::fibers::barrier& barrier) {
253     std::string name( boost::this_fiber::properties< priority_props >().name);
254     Verbose v( std::string("fiber ") + name);
255 
256 //<-
257     std::cout << "fiber " << name << " waiting on barrier" << std::endl;
258 //->
259     barrier.wait();
260     // We assume a couple things about 'other':
261     // - that it was also waiting on the same barrier
262     // - that it has lower priority than this fiber.
263     // If both are true, 'other' is now ready to run but is sitting in
264     // priority_scheduler's ready queue. Change its priority.
265     priority_props & other_props(
266             other.properties< priority_props >() );
267 //<-
268     std::cout << "fiber " << name << " changing priority of " << other_props.name
269               << " to " << other_priority << std::endl;
270 //->
271     other_props.set_priority( other_priority);
272 }
273 //]
274 
275 //[main
main(int argc,char * argv[])276 int main( int argc, char *argv[]) {
277     // make sure we use our priority_scheduler rather than default round_robin
278     boost::fibers::use_scheduling_algorithm< priority_scheduler >();
279 /*=    ...*/
280 /*=}*/
281 //]
282     Verbose v("main()");
283 
284     // for clarity
285     std::cout << "main() setting name" << std::endl;
286 //[main_name
287     boost::this_fiber::properties< priority_props >().name = "main";
288 //]
289     std::cout << "main() running tests" << std::endl;
290 
291     {
292         Verbose v("high-priority first", "stop\n");
293         // verify that high-priority fiber always gets scheduled first
294         boost::fibers::fiber low( launch( yield_fn, "low",    1) );
295         boost::fibers::fiber med( launch( yield_fn, "medium", 2) );
296         boost::fibers::fiber hi( launch( yield_fn,  "high",   3) );
297         std::cout << "main: high.join()" << std::endl;
298         hi.join();
299         std::cout << "main: medium.join()" << std::endl;
300         med.join();
301         std::cout << "main: low.join()" << std::endl;
302         low.join();
303     }
304 
305     {
306         Verbose v("same priority round-robin", "stop\n");
307         // fibers of same priority are scheduled in round-robin order
308         boost::fibers::fiber a( launch( yield_fn, "a", 0) );
309         boost::fibers::fiber b( launch( yield_fn, "b", 0) );
310         boost::fibers::fiber c( launch( yield_fn, "c", 0) );
311         std::cout << "main: a.join()" << std::endl;
312         a.join();
313         std::cout << "main: b.join()" << std::endl;
314         b.join();
315         std::cout << "main: c.join()" << std::endl;
316         c.join();
317     }
318 
319     {
320         Verbose v("barrier wakes up all", "stop\n");
321         // using a barrier wakes up all waiting fibers at the same time
322         boost::fibers::barrier barrier( 3);
323         boost::fibers::fiber low( launch( [&barrier](){ barrier_fn( barrier); }, "low",    1) );
324         boost::fibers::fiber med( launch( [&barrier](){ barrier_fn( barrier); }, "medium", 2) );
325         boost::fibers::fiber hi( launch( [&barrier](){ barrier_fn( barrier); },  "high",   3) );
326         std::cout << "main: low.join()" << std::endl;
327         low.join();
328         std::cout << "main: medium.join()" << std::endl;
329         med.join();
330         std::cout << "main: high.join()" << std::endl;
331         hi.join();
332     }
333 
334     {
335         Verbose v("change priority", "stop\n");
336         // change priority of a fiber in priority_scheduler's ready queue
337         boost::fibers::barrier barrier( 3);
338         boost::fibers::fiber c( launch( [&barrier](){ barrier_fn( barrier); }, "c", 1) );
339         boost::fibers::fiber a( launch( [&c,&barrier]() { change_fn( c, 3, barrier); }, "a", 3) );
340         boost::fibers::fiber b( launch( [&barrier](){ barrier_fn( barrier); }, "b", 2) );
341         std::cout << "main: a.join()" << std::endl;
342         std::cout << "main: a.join()" << std::endl;
343         a.join();
344         std::cout << "main: b.join()" << std::endl;
345         b.join();
346         std::cout << "main: c.join()" << std::endl;
347         c.join();
348     }
349 
350     std::cout << "done." << std::endl;
351 
352     return EXIT_SUCCESS;
353 }
354