• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //          Copyright Nat Goodspeed 2015.
2 // Distributed under the Boost Software License, Version 1.0.
3 //    (See accompanying file LICENSE_1_0.txt or copy at
4 //          http://www.boost.org/LICENSE_1_0.txt)
5 //
6 #include <algorithm>
7 #include <cassert>
8 #include <chrono>
9 #include <iostream>
10 #include <memory>
11 #include <sstream>
12 #include <string>
13 #include <type_traits>
14 #include <utility>
15 #include <vector>
16 
17 #include <boost/fiber/all.hpp>
18 #include <boost/variant/variant.hpp>
19 #include <boost/variant/get.hpp>
20 
21 // These are wait_something() functions rather than when_something()
22 // functions. A big part of the point of the Fiber library is to model
23 // sequencing using the processor's instruction pointer rather than chains of
24 // callbacks. The future-oriented when_all() / when_any() functions are still
25 // based on chains of callbacks. With Fiber, we can do better.
26 
27 /*****************************************************************************
28 *   Verbose
29 *****************************************************************************/
30 class Verbose {
31 public:
Verbose(std::string const & d)32     Verbose( std::string const& d):
33         desc( d) {
34         std::cout << desc << " start" << std::endl;
35     }
36 
~Verbose()37     ~Verbose() {
38         std::cout << desc << " stop" << std::endl;
39     }
40 
41     Verbose( Verbose const&) = delete;
42     Verbose & operator=( Verbose const&) = delete;
43 
44 private:
45     const std::string desc;
46 };
47 
48 /*****************************************************************************
49 *   Runner and Example
50 *****************************************************************************/
51 // collect and ultimately run every Example
52 class Runner {
53     typedef std::vector< std::pair< std::string, std::function< void() > > >    function_list;
54 
55 public:
add(std::string const & desc,std::function<void ()> const & func)56     void add( std::string const& desc, std::function< void() > const& func) {
57         functions_.push_back( function_list::value_type( desc, func) );
58     }
59 
run()60     void run() {
61         for ( function_list::value_type const& pair : functions_) {
62             Verbose v( pair.first);
63             pair.second();
64         }
65     }
66 
67 private:
68     function_list   functions_;
69 };
70 
71 Runner runner;
72 
73 // Example allows us to embed Runner::add() calls at module scope
74 struct Example {
ExampleExample75     Example( Runner & runner, std::string const& desc, std::function< void() > const& func) {
76         runner.add( desc, func);
77     }
78 };
79 
80 /*****************************************************************************
81 *   example task functions
82 *****************************************************************************/
83 //[wait_sleeper
84 template< typename T >
sleeper_impl(T item,int ms,bool thrw=false)85 T sleeper_impl( T item, int ms, bool thrw = false) {
86     std::ostringstream descb, funcb;
87     descb << item;
88     std::string desc( descb.str() );
89     funcb << "  sleeper(" << item << ")";
90     Verbose v( funcb.str() );
91 
92     boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) );
93     if ( thrw) {
94         throw std::runtime_error( desc);
95     }
96     return item;
97 }
98 //]
99 
100 inline
sleeper(std::string const & item,int ms,bool thrw=false)101 std::string sleeper( std::string const& item, int ms, bool thrw = false) {
102     return sleeper_impl( item, ms, thrw);
103 }
104 
105 inline
sleeper(double item,int ms,bool thrw=false)106 double sleeper( double item, int ms, bool thrw = false) {
107     return sleeper_impl( item, ms, thrw);
108 }
109 
110 inline
sleeper(int item,int ms,bool thrw=false)111 int sleeper(int item, int ms, bool thrw = false) {
112     return sleeper_impl( item, ms, thrw);
113 }
114 
115 /*****************************************************************************
116 *   Done
117 *****************************************************************************/
118 //[wait_done
119 // Wrap canonical pattern for condition_variable + bool flag
120 struct Done {
121 private:
122     boost::fibers::condition_variable   cond;
123     boost::fibers::mutex                mutex;
124     bool                                ready = false;
125 
126 public:
127     typedef std::shared_ptr< Done >     ptr;
128 
waitDone129     void wait() {
130         std::unique_lock< boost::fibers::mutex > lock( mutex);
131         cond.wait( lock, [this](){ return ready; });
132     }
133 
notifyDone134     void notify() {
135         {
136             std::unique_lock< boost::fibers::mutex > lock( mutex);
137             ready = true;
138         } // release mutex
139         cond.notify_one();
140     }
141 };
142 //]
143 
144 /*****************************************************************************
145 *   when_any, simple completion
146 *****************************************************************************/
147 //[wait_first_simple_impl
148 // Degenerate case: when there are no functions to wait for, return
149 // immediately.
wait_first_simple_impl(Done::ptr)150 void wait_first_simple_impl( Done::ptr) {
151 }
152 
153 // When there's at least one function to wait for, launch it and recur to
154 // process the rest.
155 template< typename Fn, typename ... Fns >
wait_first_simple_impl(Done::ptr done,Fn && function,Fns &&...functions)156 void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) {
157     boost::fibers::fiber( [done, function](){
158                               function();
159                               done->notify();
160                           }).detach();
161     wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
162 }
163 //]
164 
165 // interface function: instantiate Done, launch tasks, wait for Done
166 //[wait_first_simple
167 template< typename ... Fns >
wait_first_simple(Fns &&...functions)168 void wait_first_simple( Fns && ... functions) {
169     // Use shared_ptr because each function's fiber will bind it separately,
170     // and we're going to return before the last of them completes.
171     auto done( std::make_shared< Done >() );
172     wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
173     done->wait();
174 }
175 //]
176 
177 // example usage
__anon9c9826e50302()178 Example wfs( runner, "wait_first_simple()", [](){
179 //[wait_first_simple_ex
180     wait_first_simple(
181             [](){ sleeper("wfs_long",   150); },
182             [](){ sleeper("wfs_medium", 100); },
183             [](){ sleeper("wfs_short",   50); });
184 //]
185 });
186 
187 /*****************************************************************************
188 *   when_any, return value
189 *****************************************************************************/
190 // When there's only one function, call this overload
191 //[wait_first_value_impl
192 template< typename T, typename Fn >
wait_first_value_impl(std::shared_ptr<boost::fibers::buffered_channel<T>> chan,Fn && function)193 void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
194                             Fn && function) {
195     boost::fibers::fiber( [chan, function](){
196                               // Ignore channel_op_status returned by push():
197                               // might be closed; we simply don't care.
198                               chan->push( function() );
199                           }).detach();
200 }
201 //]
202 
203 // When there are two or more functions, call this overload
204 template< typename T, typename Fn0, typename Fn1, typename ... Fns >
wait_first_value_impl(std::shared_ptr<boost::fibers::buffered_channel<T>> chan,Fn0 && function0,Fn1 && function1,Fns &&...functions)205 void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
206                             Fn0 && function0,
207                             Fn1 && function1,
208                             Fns && ... functions) {
209     // process the first function using the single-function overload
210     wait_first_value_impl< T >( chan,
211                                 std::forward< Fn0 >( function0) );
212     // then recur to process the rest
213     wait_first_value_impl< T >( chan,
214                                 std::forward< Fn1 >( function1),
215                                 std::forward< Fns >( functions) ... );
216 }
217 
218 //[wait_first_value
219 // Assume that all passed functions have the same return type. The return type
220 // of wait_first_value() is the return type of the first passed function. It is
221 // simply invalid to pass NO functions.
222 template< typename Fn, typename ... Fns >
223 typename std::result_of< Fn() >::type
wait_first_value(Fn && function,Fns &&...functions)224 wait_first_value( Fn && function, Fns && ... functions) {
225     typedef typename std::result_of< Fn() >::type return_t;
226     typedef boost::fibers::buffered_channel< return_t > channel_t;
227     auto chanp( std::make_shared< channel_t >( 64) );
228     // launch all the relevant fibers
229     wait_first_value_impl< return_t >( chanp,
230                                        std::forward< Fn >( function),
231                                        std::forward< Fns >( functions) ... );
232     // retrieve the first value
233     return_t value( chanp->value_pop() );
234     // close the channel: no subsequent push() has to succeed
235     chanp->close();
236     return value;
237 }
238 //]
239 
240 // example usage
__anon9c9826e50802()241 Example wfv( runner, "wait_first_value()", [](){
242 //[wait_first_value_ex
243     std::string result = wait_first_value(
244             [](){ return sleeper("wfv_third",  150); },
245             [](){ return sleeper("wfv_second", 100); },
246             [](){ return sleeper("wfv_first",   50); });
247     std::cout << "wait_first_value() => " << result << std::endl;
248     assert(result == "wfv_first");
249 //]
250 });
251 
252 /*****************************************************************************
253 *   when_any, produce first outcome, whether result or exception
254 *****************************************************************************/
255 // When there's only one function, call this overload.
256 //[wait_first_outcome_impl
257 template< typename T, typename CHANP, typename Fn >
wait_first_outcome_impl(CHANP chan,Fn && function)258 void wait_first_outcome_impl( CHANP chan, Fn && function) {
259     boost::fibers::fiber(
260         // Use std::bind() here for C++11 compatibility. C++11 lambda capture
261         // can't move a move-only Fn type, but bind() can. Let bind() move the
262         // channel pointer and the function into the bound object, passing
263         // references into the lambda.
264         std::bind(
265             []( CHANP & chan,
266                 typename std::decay< Fn >::type & function) {
267                 // Instantiate a packaged_task to capture any exception thrown by
268                 // function.
269                 boost::fibers::packaged_task< T() > task( function);
270                 // Immediately run this packaged_task on same fiber. We want
271                 // function() to have completed BEFORE we push the future.
272                 task();
273                 // Pass the corresponding future to consumer. Ignore
274                 // channel_op_status returned by push(): might be closed; we
275                 // simply don't care.
276                 chan->push( task.get_future() );
277             },
278             chan,
279             std::forward< Fn >( function)
280         )).detach();
281 }
282 //]
283 
284 // When there are two or more functions, call this overload
285 template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns >
wait_first_outcome_impl(CHANP chan,Fn0 && function0,Fn1 && function1,Fns &&...functions)286 void wait_first_outcome_impl( CHANP chan,
287                               Fn0 && function0,
288                               Fn1 && function1,
289                               Fns && ... functions) {
290     // process the first function using the single-function overload
291     wait_first_outcome_impl< T >( chan,
292                                   std::forward< Fn0 >( function0) );
293     // then recur to process the rest
294     wait_first_outcome_impl< T >( chan,
295                                   std::forward< Fn1 >( function1),
296                                   std::forward< Fns >( functions) ... );
297 }
298 
299 // Assume that all passed functions have the same return type. The return type
300 // of wait_first_outcome() is the return type of the first passed function. It is
301 // simply invalid to pass NO functions.
302 //[wait_first_outcome
303 template< typename Fn, typename ... Fns >
304 typename std::result_of< Fn() >::type
wait_first_outcome(Fn && function,Fns &&...functions)305 wait_first_outcome( Fn && function, Fns && ... functions) {
306     // In this case, the value we pass through the channel is actually a
307     // future -- which is already ready. future can carry either a value or an
308     // exception.
309     typedef typename std::result_of< Fn() >::type return_t;
310     typedef boost::fibers::future< return_t > future_t;
311     typedef boost::fibers::buffered_channel< future_t > channel_t;
312     auto chanp(std::make_shared< channel_t >( 64) );
313     // launch all the relevant fibers
314     wait_first_outcome_impl< return_t >( chanp,
315                                          std::forward< Fn >( function),
316                                          std::forward< Fns >( functions) ... );
317     // retrieve the first future
318     future_t future( chanp->value_pop() );
319     // close the channel: no subsequent push() has to succeed
320     chanp->close();
321     // either return value or throw exception
322     return future.get();
323 }
324 //]
325 
326 // example usage
__anon9c9826e50d02()327 Example wfo( runner, "wait_first_outcome()", [](){
328 //[wait_first_outcome_ex
329     std::string result = wait_first_outcome(
330             [](){ return sleeper("wfos_first",   50); },
331             [](){ return sleeper("wfos_second", 100); },
332             [](){ return sleeper("wfos_third",  150); });
333     std::cout << "wait_first_outcome(success) => " << result << std::endl;
334     assert(result == "wfos_first");
335 
336     std::string thrown;
337     try {
338         result = wait_first_outcome(
339                 [](){ return sleeper("wfof_first",   50, true); },
340                 [](){ return sleeper("wfof_second", 100); },
341                 [](){ return sleeper("wfof_third",  150); });
342     } catch ( std::exception const& e) {
343         thrown = e.what();
344     }
345     std::cout << "wait_first_outcome(fail) threw '" << thrown
346               << "'" << std::endl;
347     assert(thrown == "wfof_first");
348 //]
349 });
350 
351 /*****************************************************************************
352 *   when_any, collect exceptions until success; throw exception_list if no
353 *   success
354 *****************************************************************************/
355 // define an exception to aggregate exception_ptrs; prefer
356 // std::exception_list (N4407 et al.) once that becomes available
357 //[exception_list
358 class exception_list : public std::runtime_error {
359 public:
exception_list(std::string const & what)360     exception_list( std::string const& what) :
361         std::runtime_error( what) {
362     }
363 
364     typedef std::vector< std::exception_ptr >   bundle_t;
365 
366     // N4407 proposed std::exception_list API
367     typedef bundle_t::const_iterator iterator;
368 
size() const369     std::size_t size() const noexcept {
370         return bundle_.size();
371     }
372 
begin() const373     iterator begin() const noexcept {
374         return bundle_.begin();
375     }
376 
end() const377     iterator end() const noexcept {
378         return bundle_.end();
379     }
380 
381     // extension to populate
add(std::exception_ptr ep)382     void add( std::exception_ptr ep) {
383         bundle_.push_back( ep);
384     }
385 
386 private:
387     bundle_t bundle_;
388 };
389 //]
390 
391 // Assume that all passed functions have the same return type. The return type
392 // of wait_first_success() is the return type of the first passed function. It is
393 // simply invalid to pass NO functions.
394 //[wait_first_success
395 template< typename Fn, typename ... Fns >
396 typename std::result_of< Fn() >::type
wait_first_success(Fn && function,Fns &&...functions)397 wait_first_success( Fn && function, Fns && ... functions) {
398     std::size_t count( 1 + sizeof ... ( functions) );
399     // In this case, the value we pass through the channel is actually a
400     // future -- which is already ready. future can carry either a value or an
401     // exception.
402     typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
403     typedef boost::fibers::future< return_t > future_t;
404     typedef boost::fibers::buffered_channel< future_t > channel_t;
405     auto chanp( std::make_shared< channel_t >( 64) );
406     // launch all the relevant fibers
407     wait_first_outcome_impl< return_t >( chanp,
408                                          std::forward< Fn >( function),
409                                          std::forward< Fns >( functions) ... );
410     // instantiate exception_list, just in case
411     exception_list exceptions("wait_first_success() produced only errors");
412     // retrieve up to 'count' results -- but stop there!
413     for ( std::size_t i = 0; i < count; ++i) {
414         // retrieve the next future
415         future_t future( chanp->value_pop() );
416         // retrieve exception_ptr if any
417         std::exception_ptr error( future.get_exception_ptr() );
418         // if no error, then yay, return value
419         if ( ! error) {
420             // close the channel: no subsequent push() has to succeed
421             chanp->close();
422             // show caller the value we got
423             return future.get();
424         }
425 
426         // error is non-null: collect
427         exceptions.add( error);
428     }
429     // We only arrive here when every passed function threw an exception.
430     // Throw our collection to inform caller.
431     throw exceptions;
432 }
433 //]
434 
435 // example usage
__anon9c9826e51402()436 Example wfss( runner, "wait_first_success()", [](){
437 //[wait_first_success_ex
438     std::string result = wait_first_success(
439                 [](){ return sleeper("wfss_first",   50, true); },
440                 [](){ return sleeper("wfss_second", 100); },
441                 [](){ return sleeper("wfss_third",  150); });
442     std::cout << "wait_first_success(success) => " << result << std::endl;
443     assert(result == "wfss_second");
444 //]
445 
446     std::string thrown;
447     std::size_t count = 0;
448     try {
449         result = wait_first_success(
450                 [](){ return sleeper("wfsf_first",   50, true); },
451                 [](){ return sleeper("wfsf_second", 100, true); },
452                 [](){ return sleeper("wfsf_third",  150, true); });
453     } catch ( exception_list const& e) {
454         thrown = e.what();
455         count = e.size();
456     } catch ( std::exception const& e) {
457         thrown = e.what();
458     }
459     std::cout << "wait_first_success(fail) threw '" << thrown << "': "
460               << count << " errors" << std::endl;
461     assert(thrown == "wait_first_success() produced only errors");
462     assert(count == 3);
463 });
464 
465 /*****************************************************************************
466 *   when_any, heterogeneous
467 *****************************************************************************/
468 //[wait_first_value_het
469 // No need to break out the first Fn for interface function: let the compiler
470 // complain if empty.
471 // Our functions have different return types, and we might have to return any
472 // of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
473 // parameter pack.
474 template< typename ... Fns >
475 boost::variant< typename std::result_of< Fns() >::type ... >
wait_first_value_het(Fns &&...functions)476 wait_first_value_het( Fns && ... functions) {
477     // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
478     typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
479     typedef boost::fibers::buffered_channel< return_t > channel_t;
480     auto chanp( std::make_shared< channel_t >( 64) );
481     // launch all the relevant fibers
482     wait_first_value_impl< return_t >( chanp,
483                                        std::forward< Fns >( functions) ... );
484     // retrieve the first value
485     return_t value( chanp->value_pop() );
486     // close the channel: no subsequent push() has to succeed
487     chanp->close();
488     return value;
489 }
490 //]
491 
492 // example usage
__anon9c9826e51b02()493 Example wfvh( runner, "wait_first_value_het()", [](){
494 //[wait_first_value_het_ex
495     boost::variant< std::string, double, int > result =
496         wait_first_value_het(
497                 [](){ return sleeper("wfvh_third",  150); },
498                 [](){ return sleeper(3.14,          100); },
499                 [](){ return sleeper(17,             50); });
500     std::cout << "wait_first_value_het() => " << result << std::endl;
501     assert(boost::get< int >( result) == 17);
502 //]
503 });
504 
505 /*****************************************************************************
506 *   when_all, simple completion
507 *****************************************************************************/
508 // Degenerate case: when there are no functions to wait for, return
509 // immediately.
wait_all_simple_impl(std::shared_ptr<boost::fibers::barrier>)510 void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) {
511 }
512 
513 // When there's at least one function to wait for, launch it and recur to
514 // process the rest.
515 //[wait_all_simple_impl
516 template< typename Fn, typename ... Fns >
wait_all_simple_impl(std::shared_ptr<boost::fibers::barrier> barrier,Fn && function,Fns &&...functions)517 void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier,
518                            Fn && function, Fns && ... functions) {
519     boost::fibers::fiber(
520             std::bind(
521                 []( std::shared_ptr< boost::fibers::barrier > & barrier,
522                     typename std::decay< Fn >::type & function) mutable {
523                         function();
524                         barrier->wait();
525                 },
526                 barrier,
527                 std::forward< Fn >( function)
528             )).detach();
529     wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
530 }
531 //]
532 
533 // interface function: instantiate barrier, launch tasks, wait for barrier
534 //[wait_all_simple
535 template< typename ... Fns >
wait_all_simple(Fns &&...functions)536 void wait_all_simple( Fns && ... functions) {
537     std::size_t count( sizeof ... ( functions) );
538     // Initialize a barrier(count+1) because we'll immediately wait on it. We
539     // don't want to wake up until 'count' more fibers wait on it. Even though
540     // we'll stick around until the last of them completes, use shared_ptr
541     // anyway because it's easier to be confident about lifespan issues.
542     auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) );
543     wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
544     barrier->wait();
545 }
546 //]
547 
548 // example usage
__anon9c9826e52002()549 Example was( runner, "wait_all_simple()", [](){
550 //[wait_all_simple_ex
551     wait_all_simple(
552             [](){ sleeper("was_long",   150); },
553             [](){ sleeper("was_medium", 100); },
554             [](){ sleeper("was_short",   50); });
555 //]
556 });
557 
558 /*****************************************************************************
559 *   when_all, return values
560 *****************************************************************************/
561 //[wait_nchannel
562 // Introduce a channel facade that closes the channel once a specific number
563 // of items has been pushed. This allows an arbitrary consumer to read until
564 // 'closed' without itself having to count items.
565 template< typename T >
566 class nchannel {
567 public:
nchannel(std::shared_ptr<boost::fibers::buffered_channel<T>> chan,std::size_t lm)568     nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
569               std::size_t lm):
570         chan_( chan),
571         limit_( lm) {
572         assert(chan_);
573         if ( 0 == limit_) {
574             chan_->close();
575         }
576     }
577 
push(T && va)578     boost::fibers::channel_op_status push( T && va) {
579         boost::fibers::channel_op_status ok =
580             chan_->push( std::forward< T >( va) );
581         if ( ok == boost::fibers::channel_op_status::success &&
582              --limit_ == 0) {
583             // after the 'limit_'th successful push, close the channel
584             chan_->close();
585         }
586         return ok;
587     }
588 
589 private:
590     std::shared_ptr< boost::fibers::buffered_channel< T > >    chan_;
591     std::size_t                                                 limit_;
592 };
593 //]
594 
595 // When there's only one function, call this overload
596 //[wait_all_values_impl
597 template< typename T, typename Fn >
wait_all_values_impl(std::shared_ptr<nchannel<T>> chan,Fn && function)598 void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
599                            Fn && function) {
600     boost::fibers::fiber( [chan, function](){
601                               chan->push(function());
602                           }).detach();
603 }
604 //]
605 
606 // When there are two or more functions, call this overload
607 template< typename T, typename Fn0, typename Fn1, typename ... Fns >
wait_all_values_impl(std::shared_ptr<nchannel<T>> chan,Fn0 && function0,Fn1 && function1,Fns &&...functions)608 void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
609                            Fn0 && function0,
610                            Fn1 && function1,
611                            Fns && ... functions) {
612     // process the first function using the single-function overload
613     wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) );
614     // then recur to process the rest
615     wait_all_values_impl< T >( chan,
616                                std::forward< Fn1 >( function1),
617                                std::forward< Fns >( functions) ... );
618 }
619 
620 //[wait_all_values_source
621 // Return a shared_ptr<buffered_channel<T>> from which the caller can
622 // retrieve each new result as it arrives, until 'closed'.
623 template< typename Fn, typename ... Fns >
624 std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
wait_all_values_source(Fn && function,Fns &&...functions)625 wait_all_values_source( Fn && function, Fns && ... functions) {
626     std::size_t count( 1 + sizeof ... ( functions) );
627     typedef typename std::result_of< Fn() >::type return_t;
628     typedef boost::fibers::buffered_channel< return_t > channel_t;
629     // make the channel
630     auto chanp( std::make_shared< channel_t >( 64) );
631     // and make an nchannel facade to close it after 'count' items
632     auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
633     // pass that nchannel facade to all the relevant fibers
634     wait_all_values_impl< return_t >( ncp,
635                                       std::forward< Fn >( function),
636                                       std::forward< Fns >( functions) ... );
637     // then return the channel for consumer
638     return chanp;
639 }
640 //]
641 
642 // When all passed functions have completed, return vector<T> containing
643 // collected results. Assume that all passed functions have the same return
644 // type. It is simply invalid to pass NO functions.
645 //[wait_all_values
646 template< typename Fn, typename ... Fns >
647 std::vector< typename std::result_of< Fn() >::type >
wait_all_values(Fn && function,Fns &&...functions)648 wait_all_values( Fn && function, Fns && ... functions) {
649     std::size_t count( 1 + sizeof ... ( functions) );
650     typedef typename std::result_of< Fn() >::type return_t;
651     typedef std::vector< return_t > vector_t;
652     vector_t results;
653     results.reserve( count);
654 
655     // get channel
656     std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
657         wait_all_values_source( std::forward< Fn >( function),
658                                 std::forward< Fns >( functions) ... );
659     // fill results vector
660     return_t value;
661     while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
662         results.push_back( value);
663     }
664     // return vector to caller
665     return results;
666 }
667 //]
668 
__anon9c9826e52502()669 Example wav( runner, "wait_all_values()", [](){
670 //[wait_all_values_source_ex
671     std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
672         wait_all_values_source(
673                 [](){ return sleeper("wavs_third",  150); },
674                 [](){ return sleeper("wavs_second", 100); },
675                 [](){ return sleeper("wavs_first",   50); });
676     std::string value;
677     while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
678         std::cout << "wait_all_values_source() => '" << value
679                   << "'" << std::endl;
680     }
681 //]
682 
683 //[wait_all_values_ex
684     std::vector< std::string > values =
685         wait_all_values(
686                 [](){ return sleeper("wav_late",   150); },
687                 [](){ return sleeper("wav_middle", 100); },
688                 [](){ return sleeper("wav_early",   50); });
689 //]
690     std::cout << "wait_all_values() =>";
691     for ( std::string const& v : values) {
692         std::cout << " '" << v << "'";
693     }
694     std::cout << std::endl;
695 });
696 
697 /*****************************************************************************
698 *   when_all, throw first exception
699 *****************************************************************************/
700 //[wait_all_until_error_source
701 // Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
702 // get() each new result as it arrives, until 'closed'.
703 template< typename Fn, typename ... Fns >
704 std::shared_ptr<
705     boost::fibers::buffered_channel<
706         boost::fibers::future<
707             typename std::result_of< Fn() >::type > > >
wait_all_until_error_source(Fn && function,Fns &&...functions)708 wait_all_until_error_source( Fn && function, Fns && ... functions) {
709     std::size_t count( 1 + sizeof ... ( functions) );
710     typedef typename std::result_of< Fn() >::type return_t;
711     typedef boost::fibers::future< return_t > future_t;
712     typedef boost::fibers::buffered_channel< future_t > channel_t;
713     // make the channel
714     auto chanp( std::make_shared< channel_t >( 64) );
715     // and make an nchannel facade to close it after 'count' items
716     auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) );
717     // pass that nchannel facade to all the relevant fibers
718     wait_first_outcome_impl< return_t >( ncp,
719                                          std::forward< Fn >( function),
720                                          std::forward< Fns >( functions) ... );
721     // then return the channel for consumer
722     return chanp;
723 }
724 //]
725 
726 // When all passed functions have completed, return vector<T> containing
727 // collected results, or throw the first exception thrown by any of the passed
728 // functions. Assume that all passed functions have the same return type. It
729 // is simply invalid to pass NO functions.
730 //[wait_all_until_error
731 template< typename Fn, typename ... Fns >
732 std::vector< typename std::result_of< Fn() >::type >
wait_all_until_error(Fn && function,Fns &&...functions)733 wait_all_until_error( Fn && function, Fns && ... functions) {
734     std::size_t count( 1 + sizeof ... ( functions) );
735     typedef typename std::result_of< Fn() >::type return_t;
736     typedef typename boost::fibers::future< return_t > future_t;
737     typedef std::vector< return_t > vector_t;
738     vector_t results;
739     results.reserve( count);
740 
741     // get channel
742     std::shared_ptr<
743         boost::fibers::buffered_channel< future_t > > chan(
744             wait_all_until_error_source( std::forward< Fn >( function),
745                                          std::forward< Fns >( functions) ... ) );
746     // fill results vector
747     future_t future;
748     while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
749         results.push_back( future.get() );
750     }
751     // return vector to caller
752     return results;
753 }
754 //]
755 
__anon9c9826e52c02()756 Example waue( runner, "wait_all_until_error()", [](){
757 //[wait_all_until_error_source_ex
758     typedef boost::fibers::future< std::string > future_t;
759     std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan =
760         wait_all_until_error_source(
761                 [](){ return sleeper("wauess_third",  150); },
762                 [](){ return sleeper("wauess_second", 100); },
763                 [](){ return sleeper("wauess_first",   50); });
764     future_t future;
765     while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
766         std::string value( future.get() );
767         std::cout << "wait_all_until_error_source(success) => '" << value
768                   << "'" << std::endl;
769     }
770 //]
771 
772     chan = wait_all_until_error_source(
773             [](){ return sleeper("wauesf_third",  150); },
774             [](){ return sleeper("wauesf_second", 100, true); },
775             [](){ return sleeper("wauesf_first",   50); });
776 //[wait_all_until_error_ex
777     std::string thrown;
778 //<-
779     try {
780         while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
781             std::string value( future.get() );
782             std::cout << "wait_all_until_error_source(fail) => '" << value
783                       << "'" << std::endl;
784         }
785     } catch ( std::exception const& e) {
786         thrown = e.what();
787     }
788     std::cout << "wait_all_until_error_source(fail) threw '" << thrown
789               << "'" << std::endl;
790 
791     thrown.clear();
792 //->
793     try {
794         std::vector< std::string > values = wait_all_until_error(
795                 [](){ return sleeper("waue_late",   150); },
796                 [](){ return sleeper("waue_middle", 100, true); },
797                 [](){ return sleeper("waue_early",   50); });
798 //<-
799         std::cout << "wait_all_until_error(fail) =>";
800         for ( std::string const& v : values) {
801             std::cout << " '" << v << "'";
802         }
803         std::cout << std::endl;
804 //->
805     } catch ( std::exception const& e) {
806         thrown = e.what();
807     }
808     std::cout << "wait_all_until_error(fail) threw '" << thrown
809               << "'" << std::endl;
810 //]
811 });
812 
813 /*****************************************************************************
814 *   when_all, collect exceptions
815 *****************************************************************************/
816 // When all passed functions have succeeded, return vector<T> containing
817 // collected results, or throw exception_list containing all exceptions thrown
818 // by any of the passed functions. Assume that all passed functions have the
819 // same return type. It is simply invalid to pass NO functions.
820 //[wait_all_collect_errors
821 template< typename Fn, typename ... Fns >
822 std::vector< typename std::result_of< Fn() >::type >
wait_all_collect_errors(Fn && function,Fns &&...functions)823 wait_all_collect_errors( Fn && function, Fns && ... functions) {
824     std::size_t count( 1 + sizeof ... ( functions) );
825     typedef typename std::result_of< Fn() >::type return_t;
826     typedef typename boost::fibers::future< return_t > future_t;
827     typedef std::vector< return_t > vector_t;
828     vector_t results;
829     results.reserve( count);
830     exception_list exceptions("wait_all_collect_errors() exceptions");
831 
832     // get channel
833     std::shared_ptr<
834         boost::fibers::buffered_channel< future_t > > chan(
835             wait_all_until_error_source( std::forward< Fn >( function),
836                                          std::forward< Fns >( functions) ... ) );
837     // fill results and/or exceptions vectors
838     future_t future;
839     while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
840         std::exception_ptr exp = future.get_exception_ptr();
841         if ( ! exp) {
842             results.push_back( future.get() );
843         } else {
844             exceptions.add( exp);
845         }
846     }
847     // if there were any exceptions, throw
848     if ( exceptions.size() ) {
849         throw exceptions;
850     }
851     // no exceptions: return vector to caller
852     return results;
853 }
854 //]
855 
__anon9c9826e53602()856 Example wace( runner, "wait_all_collect_errors()", [](){
857     std::vector< std::string > values = wait_all_collect_errors(
858             [](){ return sleeper("waces_late",   150); },
859             [](){ return sleeper("waces_middle", 100); },
860             [](){ return sleeper("waces_early",   50); });
861     std::cout << "wait_all_collect_errors(success) =>";
862     for ( std::string const& v : values) {
863         std::cout << " '" << v << "'";
864     }
865     std::cout << std::endl;
866 
867     std::string thrown;
868     std::size_t errors = 0;
869     try {
870         values = wait_all_collect_errors(
871                 [](){ return sleeper("wacef_late",   150, true); },
872                 [](){ return sleeper("wacef_middle", 100, true); },
873                 [](){ return sleeper("wacef_early",   50); });
874         std::cout << "wait_all_collect_errors(fail) =>";
875         for ( std::string const& v : values) {
876             std::cout << " '" << v << "'";
877         }
878         std::cout << std::endl;
879     } catch ( exception_list const& e) {
880         thrown = e.what();
881         errors = e.size();
882     } catch ( std::exception const& e) {
883         thrown = e.what();
884     }
885     std::cout << "wait_all_collect_errors(fail) threw '" << thrown
886               << "': " << errors << " errors" << std::endl;
887 });
888 
889 /*****************************************************************************
890 *   when_all, heterogeneous
891 *****************************************************************************/
892 //[wait_all_members_get
893 template< typename Result, typename ... Futures >
wait_all_members_get(Futures &&...futures)894 Result wait_all_members_get( Futures && ... futures) {
895     // Fetch the results from the passed futures into Result's initializer
896     // list. It's true that the get() calls here will block the implicit
897     // iteration over futures -- but that doesn't matter because we won't be
898     // done until the slowest of them finishes anyway. As results are
899     // processed in argument-list order rather than order of completion, the
900     // leftmost get() to throw an exception will cause that exception to
901     // propagate to the caller.
902     return Result{ futures.get() ... };
903 }
904 //]
905 
906 //[wait_all_members
907 // Explicitly pass Result. This can be any type capable of being initialized
908 // from the results of the passed functions, such as a struct.
909 template< typename Result, typename ... Fns >
wait_all_members(Fns &&...functions)910 Result wait_all_members( Fns && ... functions) {
911     // Run each of the passed functions on a separate fiber, passing all their
912     // futures to helper function for processing.
913     return wait_all_members_get< Result >(
914             boost::fibers::async( std::forward< Fns >( functions) ) ... );
915 }
916 //]
917 
918 // used by following example
919 //[wait_Data
920 struct Data {
921     std::string str;
922     double      inexact;
923     int         exact;
924 
operator <<(std::ostream & out,Data const & data)925     friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=;
926     ...*/
927 //<-
928     {
929         return out << "Data{str='" << data.str << "', inexact=" << data.inexact
930                    << ", exact=" << data.exact << "}";
931     }
932 //->
933 };
934 //]
935 
936 // example usage
__anon9c9826e53d02()937 Example wam( runner, "wait_all_members()", [](){
938 //[wait_all_members_data_ex
939     Data data = wait_all_members< Data >(
940             [](){ return sleeper("wams_left", 100); },
941             [](){ return sleeper(3.14,        150); },
942             [](){ return sleeper(17,          50); });
943     std::cout << "wait_all_members<Data>(success) => " << data << std::endl;
944 //]
945 
946     std::string thrown;
947     try {
948         data = wait_all_members< Data >(
949                 [](){ return sleeper("wamf_left", 100, true); },
950                 [](){ return sleeper(3.14,        150); },
951                 [](){ return sleeper(17,          50, true); });
952         std::cout << "wait_all_members<Data>(fail) => " << data << std::endl;
953     } catch ( std::exception const& e) {
954         thrown = e.what();
955     }
956     std::cout << "wait_all_members<Data>(fail) threw '" << thrown
957               << '"' << std::endl;
958 
959 //[wait_all_members_vector_ex
960     // If we don't care about obtaining results as soon as they arrive, and we
961     // prefer a result vector in passed argument order rather than completion
962     // order, wait_all_members() is another possible implementation of
963     // wait_all_until_error().
964     auto strings = wait_all_members< std::vector< std::string > >(
965             [](){ return sleeper("wamv_left",   150); },
966             [](){ return sleeper("wamv_middle", 100); },
967             [](){ return sleeper("wamv_right",   50); });
968     std::cout << "wait_all_members<vector>() =>";
969     for ( std::string const& str : strings) {
970         std::cout << " '" << str << "'";
971     }
972     std::cout << std::endl;
973 //]
974 });
975 
976 
977 /*****************************************************************************
978 *   main()
979 *****************************************************************************/
main(int argc,char * argv[])980 int main( int argc, char *argv[]) {
981     runner.run();
982     std::cout << "done." << std::endl;
983     return EXIT_SUCCESS;
984 }
985