1 // Copyright Nat Goodspeed 2015.
2 // Distributed under the Boost Software License, Version 1.0.
3 // (See accompanying file LICENSE_1_0.txt or copy at
4 // http://www.boost.org/LICENSE_1_0.txt)
5 //
6 #include <algorithm>
7 #include <cassert>
8 #include <chrono>
9 #include <iostream>
10 #include <memory>
11 #include <sstream>
12 #include <string>
13 #include <type_traits>
14 #include <utility>
15 #include <vector>
16
17 #include <boost/fiber/all.hpp>
18 #include <boost/variant/variant.hpp>
19 #include <boost/variant/get.hpp>
20
21 // These are wait_something() functions rather than when_something()
22 // functions. A big part of the point of the Fiber library is to model
23 // sequencing using the processor's instruction pointer rather than chains of
24 // callbacks. The future-oriented when_all() / when_any() functions are still
25 // based on chains of callbacks. With Fiber, we can do better.
26
27 /*****************************************************************************
28 * Verbose
29 *****************************************************************************/
30 class Verbose {
31 public:
Verbose(std::string const & d)32 Verbose( std::string const& d):
33 desc( d) {
34 std::cout << desc << " start" << std::endl;
35 }
36
~Verbose()37 ~Verbose() {
38 std::cout << desc << " stop" << std::endl;
39 }
40
41 Verbose( Verbose const&) = delete;
42 Verbose & operator=( Verbose const&) = delete;
43
44 private:
45 const std::string desc;
46 };
47
48 /*****************************************************************************
49 * Runner and Example
50 *****************************************************************************/
51 // collect and ultimately run every Example
52 class Runner {
53 typedef std::vector< std::pair< std::string, std::function< void() > > > function_list;
54
55 public:
add(std::string const & desc,std::function<void ()> const & func)56 void add( std::string const& desc, std::function< void() > const& func) {
57 functions_.push_back( function_list::value_type( desc, func) );
58 }
59
run()60 void run() {
61 for ( function_list::value_type const& pair : functions_) {
62 Verbose v( pair.first);
63 pair.second();
64 }
65 }
66
67 private:
68 function_list functions_;
69 };
70
71 Runner runner;
72
73 // Example allows us to embed Runner::add() calls at module scope
74 struct Example {
ExampleExample75 Example( Runner & runner, std::string const& desc, std::function< void() > const& func) {
76 runner.add( desc, func);
77 }
78 };
79
80 /*****************************************************************************
81 * example task functions
82 *****************************************************************************/
83 //[wait_sleeper
84 template< typename T >
sleeper_impl(T item,int ms,bool thrw=false)85 T sleeper_impl( T item, int ms, bool thrw = false) {
86 std::ostringstream descb, funcb;
87 descb << item;
88 std::string desc( descb.str() );
89 funcb << " sleeper(" << item << ")";
90 Verbose v( funcb.str() );
91
92 boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) );
93 if ( thrw) {
94 throw std::runtime_error( desc);
95 }
96 return item;
97 }
98 //]
99
100 inline
sleeper(std::string const & item,int ms,bool thrw=false)101 std::string sleeper( std::string const& item, int ms, bool thrw = false) {
102 return sleeper_impl( item, ms, thrw);
103 }
104
105 inline
sleeper(double item,int ms,bool thrw=false)106 double sleeper( double item, int ms, bool thrw = false) {
107 return sleeper_impl( item, ms, thrw);
108 }
109
110 inline
sleeper(int item,int ms,bool thrw=false)111 int sleeper(int item, int ms, bool thrw = false) {
112 return sleeper_impl( item, ms, thrw);
113 }
114
115 /*****************************************************************************
116 * Done
117 *****************************************************************************/
118 //[wait_done
119 // Wrap canonical pattern for condition_variable + bool flag
120 struct Done {
121 private:
122 boost::fibers::condition_variable cond;
123 boost::fibers::mutex mutex;
124 bool ready = false;
125
126 public:
127 typedef std::shared_ptr< Done > ptr;
128
waitDone129 void wait() {
130 std::unique_lock< boost::fibers::mutex > lock( mutex);
131 cond.wait( lock, [this](){ return ready; });
132 }
133
notifyDone134 void notify() {
135 {
136 std::unique_lock< boost::fibers::mutex > lock( mutex);
137 ready = true;
138 } // release mutex
139 cond.notify_one();
140 }
141 };
142 //]
143
144 /*****************************************************************************
145 * when_any, simple completion
146 *****************************************************************************/
147 //[wait_first_simple_impl
148 // Degenerate case: when there are no functions to wait for, return
149 // immediately.
wait_first_simple_impl(Done::ptr)150 void wait_first_simple_impl( Done::ptr) {
151 }
152
153 // When there's at least one function to wait for, launch it and recur to
154 // process the rest.
155 template< typename Fn, typename ... Fns >
wait_first_simple_impl(Done::ptr done,Fn && function,Fns &&...functions)156 void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) {
157 boost::fibers::fiber( [done, function](){
158 function();
159 done->notify();
160 }).detach();
161 wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
162 }
163 //]
164
165 // interface function: instantiate Done, launch tasks, wait for Done
166 //[wait_first_simple
167 template< typename ... Fns >
wait_first_simple(Fns &&...functions)168 void wait_first_simple( Fns && ... functions) {
169 // Use shared_ptr because each function's fiber will bind it separately,
170 // and we're going to return before the last of them completes.
171 auto done( std::make_shared< Done >() );
172 wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
173 done->wait();
174 }
175 //]
176
177 // example usage
__anon9c9826e50302()178 Example wfs( runner, "wait_first_simple()", [](){
179 //[wait_first_simple_ex
180 wait_first_simple(
181 [](){ sleeper("wfs_long", 150); },
182 [](){ sleeper("wfs_medium", 100); },
183 [](){ sleeper("wfs_short", 50); });
184 //]
185 });
186
187 /*****************************************************************************
188 * when_any, return value
189 *****************************************************************************/
190 // When there's only one function, call this overload
191 //[wait_first_value_impl
192 template< typename T, typename Fn >
wait_first_value_impl(std::shared_ptr<boost::fibers::buffered_channel<T>> chan,Fn && function)193 void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
194 Fn && function) {
195 boost::fibers::fiber( [chan, function](){
196 // Ignore channel_op_status returned by push():
197 // might be closed; we simply don't care.
198 chan->push( function() );
199 }).detach();
200 }
201 //]
202
203 // When there are two or more functions, call this overload
204 template< typename T, typename Fn0, typename Fn1, typename ... Fns >
wait_first_value_impl(std::shared_ptr<boost::fibers::buffered_channel<T>> chan,Fn0 && function0,Fn1 && function1,Fns &&...functions)205 void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
206 Fn0 && function0,
207 Fn1 && function1,
208 Fns && ... functions) {
209 // process the first function using the single-function overload
210 wait_first_value_impl< T >( chan,
211 std::forward< Fn0 >( function0) );
212 // then recur to process the rest
213 wait_first_value_impl< T >( chan,
214 std::forward< Fn1 >( function1),
215 std::forward< Fns >( functions) ... );
216 }
217
218 //[wait_first_value
219 // Assume that all passed functions have the same return type. The return type
220 // of wait_first_value() is the return type of the first passed function. It is
221 // simply invalid to pass NO functions.
222 template< typename Fn, typename ... Fns >
223 typename std::result_of< Fn() >::type
wait_first_value(Fn && function,Fns &&...functions)224 wait_first_value( Fn && function, Fns && ... functions) {
225 typedef typename std::result_of< Fn() >::type return_t;
226 typedef boost::fibers::buffered_channel< return_t > channel_t;
227 auto chanp( std::make_shared< channel_t >( 64) );
228 // launch all the relevant fibers
229 wait_first_value_impl< return_t >( chanp,
230 std::forward< Fn >( function),
231 std::forward< Fns >( functions) ... );
232 // retrieve the first value
233 return_t value( chanp->value_pop() );
234 // close the channel: no subsequent push() has to succeed
235 chanp->close();
236 return value;
237 }
238 //]
239
240 // example usage
__anon9c9826e50802()241 Example wfv( runner, "wait_first_value()", [](){
242 //[wait_first_value_ex
243 std::string result = wait_first_value(
244 [](){ return sleeper("wfv_third", 150); },
245 [](){ return sleeper("wfv_second", 100); },
246 [](){ return sleeper("wfv_first", 50); });
247 std::cout << "wait_first_value() => " << result << std::endl;
248 assert(result == "wfv_first");
249 //]
250 });
251
252 /*****************************************************************************
253 * when_any, produce first outcome, whether result or exception
254 *****************************************************************************/
255 // When there's only one function, call this overload.
256 //[wait_first_outcome_impl
257 template< typename T, typename CHANP, typename Fn >
wait_first_outcome_impl(CHANP chan,Fn && function)258 void wait_first_outcome_impl( CHANP chan, Fn && function) {
259 boost::fibers::fiber(
260 // Use std::bind() here for C++11 compatibility. C++11 lambda capture
261 // can't move a move-only Fn type, but bind() can. Let bind() move the
262 // channel pointer and the function into the bound object, passing
263 // references into the lambda.
264 std::bind(
265 []( CHANP & chan,
266 typename std::decay< Fn >::type & function) {
267 // Instantiate a packaged_task to capture any exception thrown by
268 // function.
269 boost::fibers::packaged_task< T() > task( function);
270 // Immediately run this packaged_task on same fiber. We want
271 // function() to have completed BEFORE we push the future.
272 task();
273 // Pass the corresponding future to consumer. Ignore
274 // channel_op_status returned by push(): might be closed; we
275 // simply don't care.
276 chan->push( task.get_future() );
277 },
278 chan,
279 std::forward< Fn >( function)
280 )).detach();
281 }
282 //]
283
284 // When there are two or more functions, call this overload
285 template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns >
wait_first_outcome_impl(CHANP chan,Fn0 && function0,Fn1 && function1,Fns &&...functions)286 void wait_first_outcome_impl( CHANP chan,
287 Fn0 && function0,
288 Fn1 && function1,
289 Fns && ... functions) {
290 // process the first function using the single-function overload
291 wait_first_outcome_impl< T >( chan,
292 std::forward< Fn0 >( function0) );
293 // then recur to process the rest
294 wait_first_outcome_impl< T >( chan,
295 std::forward< Fn1 >( function1),
296 std::forward< Fns >( functions) ... );
297 }
298
299 // Assume that all passed functions have the same return type. The return type
300 // of wait_first_outcome() is the return type of the first passed function. It is
301 // simply invalid to pass NO functions.
302 //[wait_first_outcome
303 template< typename Fn, typename ... Fns >
304 typename std::result_of< Fn() >::type
wait_first_outcome(Fn && function,Fns &&...functions)305 wait_first_outcome( Fn && function, Fns && ... functions) {
306 // In this case, the value we pass through the channel is actually a
307 // future -- which is already ready. future can carry either a value or an
308 // exception.
309 typedef typename std::result_of< Fn() >::type return_t;
310 typedef boost::fibers::future< return_t > future_t;
311 typedef boost::fibers::buffered_channel< future_t > channel_t;
312 auto chanp(std::make_shared< channel_t >( 64) );
313 // launch all the relevant fibers
314 wait_first_outcome_impl< return_t >( chanp,
315 std::forward< Fn >( function),
316 std::forward< Fns >( functions) ... );
317 // retrieve the first future
318 future_t future( chanp->value_pop() );
319 // close the channel: no subsequent push() has to succeed
320 chanp->close();
321 // either return value or throw exception
322 return future.get();
323 }
324 //]
325
326 // example usage
__anon9c9826e50d02()327 Example wfo( runner, "wait_first_outcome()", [](){
328 //[wait_first_outcome_ex
329 std::string result = wait_first_outcome(
330 [](){ return sleeper("wfos_first", 50); },
331 [](){ return sleeper("wfos_second", 100); },
332 [](){ return sleeper("wfos_third", 150); });
333 std::cout << "wait_first_outcome(success) => " << result << std::endl;
334 assert(result == "wfos_first");
335
336 std::string thrown;
337 try {
338 result = wait_first_outcome(
339 [](){ return sleeper("wfof_first", 50, true); },
340 [](){ return sleeper("wfof_second", 100); },
341 [](){ return sleeper("wfof_third", 150); });
342 } catch ( std::exception const& e) {
343 thrown = e.what();
344 }
345 std::cout << "wait_first_outcome(fail) threw '" << thrown
346 << "'" << std::endl;
347 assert(thrown == "wfof_first");
348 //]
349 });
350
351 /*****************************************************************************
352 * when_any, collect exceptions until success; throw exception_list if no
353 * success
354 *****************************************************************************/
355 // define an exception to aggregate exception_ptrs; prefer
356 // std::exception_list (N4407 et al.) once that becomes available
357 //[exception_list
358 class exception_list : public std::runtime_error {
359 public:
exception_list(std::string const & what)360 exception_list( std::string const& what) :
361 std::runtime_error( what) {
362 }
363
364 typedef std::vector< std::exception_ptr > bundle_t;
365
366 // N4407 proposed std::exception_list API
367 typedef bundle_t::const_iterator iterator;
368
size() const369 std::size_t size() const noexcept {
370 return bundle_.size();
371 }
372
begin() const373 iterator begin() const noexcept {
374 return bundle_.begin();
375 }
376
end() const377 iterator end() const noexcept {
378 return bundle_.end();
379 }
380
381 // extension to populate
add(std::exception_ptr ep)382 void add( std::exception_ptr ep) {
383 bundle_.push_back( ep);
384 }
385
386 private:
387 bundle_t bundle_;
388 };
389 //]
390
391 // Assume that all passed functions have the same return type. The return type
392 // of wait_first_success() is the return type of the first passed function. It is
393 // simply invalid to pass NO functions.
394 //[wait_first_success
395 template< typename Fn, typename ... Fns >
396 typename std::result_of< Fn() >::type
wait_first_success(Fn && function,Fns &&...functions)397 wait_first_success( Fn && function, Fns && ... functions) {
398 std::size_t count( 1 + sizeof ... ( functions) );
399 // In this case, the value we pass through the channel is actually a
400 // future -- which is already ready. future can carry either a value or an
401 // exception.
402 typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
403 typedef boost::fibers::future< return_t > future_t;
404 typedef boost::fibers::buffered_channel< future_t > channel_t;
405 auto chanp( std::make_shared< channel_t >( 64) );
406 // launch all the relevant fibers
407 wait_first_outcome_impl< return_t >( chanp,
408 std::forward< Fn >( function),
409 std::forward< Fns >( functions) ... );
410 // instantiate exception_list, just in case
411 exception_list exceptions("wait_first_success() produced only errors");
412 // retrieve up to 'count' results -- but stop there!
413 for ( std::size_t i = 0; i < count; ++i) {
414 // retrieve the next future
415 future_t future( chanp->value_pop() );
416 // retrieve exception_ptr if any
417 std::exception_ptr error( future.get_exception_ptr() );
418 // if no error, then yay, return value
419 if ( ! error) {
420 // close the channel: no subsequent push() has to succeed
421 chanp->close();
422 // show caller the value we got
423 return future.get();
424 }
425
426 // error is non-null: collect
427 exceptions.add( error);
428 }
429 // We only arrive here when every passed function threw an exception.
430 // Throw our collection to inform caller.
431 throw exceptions;
432 }
433 //]
434
435 // example usage
__anon9c9826e51402()436 Example wfss( runner, "wait_first_success()", [](){
437 //[wait_first_success_ex
438 std::string result = wait_first_success(
439 [](){ return sleeper("wfss_first", 50, true); },
440 [](){ return sleeper("wfss_second", 100); },
441 [](){ return sleeper("wfss_third", 150); });
442 std::cout << "wait_first_success(success) => " << result << std::endl;
443 assert(result == "wfss_second");
444 //]
445
446 std::string thrown;
447 std::size_t count = 0;
448 try {
449 result = wait_first_success(
450 [](){ return sleeper("wfsf_first", 50, true); },
451 [](){ return sleeper("wfsf_second", 100, true); },
452 [](){ return sleeper("wfsf_third", 150, true); });
453 } catch ( exception_list const& e) {
454 thrown = e.what();
455 count = e.size();
456 } catch ( std::exception const& e) {
457 thrown = e.what();
458 }
459 std::cout << "wait_first_success(fail) threw '" << thrown << "': "
460 << count << " errors" << std::endl;
461 assert(thrown == "wait_first_success() produced only errors");
462 assert(count == 3);
463 });
464
465 /*****************************************************************************
466 * when_any, heterogeneous
467 *****************************************************************************/
468 //[wait_first_value_het
469 // No need to break out the first Fn for interface function: let the compiler
470 // complain if empty.
471 // Our functions have different return types, and we might have to return any
472 // of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
473 // parameter pack.
474 template< typename ... Fns >
475 boost::variant< typename std::result_of< Fns() >::type ... >
wait_first_value_het(Fns &&...functions)476 wait_first_value_het( Fns && ... functions) {
477 // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
478 typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
479 typedef boost::fibers::buffered_channel< return_t > channel_t;
480 auto chanp( std::make_shared< channel_t >( 64) );
481 // launch all the relevant fibers
482 wait_first_value_impl< return_t >( chanp,
483 std::forward< Fns >( functions) ... );
484 // retrieve the first value
485 return_t value( chanp->value_pop() );
486 // close the channel: no subsequent push() has to succeed
487 chanp->close();
488 return value;
489 }
490 //]
491
492 // example usage
__anon9c9826e51b02()493 Example wfvh( runner, "wait_first_value_het()", [](){
494 //[wait_first_value_het_ex
495 boost::variant< std::string, double, int > result =
496 wait_first_value_het(
497 [](){ return sleeper("wfvh_third", 150); },
498 [](){ return sleeper(3.14, 100); },
499 [](){ return sleeper(17, 50); });
500 std::cout << "wait_first_value_het() => " << result << std::endl;
501 assert(boost::get< int >( result) == 17);
502 //]
503 });
504
505 /*****************************************************************************
506 * when_all, simple completion
507 *****************************************************************************/
508 // Degenerate case: when there are no functions to wait for, return
509 // immediately.
wait_all_simple_impl(std::shared_ptr<boost::fibers::barrier>)510 void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) {
511 }
512
513 // When there's at least one function to wait for, launch it and recur to
514 // process the rest.
515 //[wait_all_simple_impl
516 template< typename Fn, typename ... Fns >
wait_all_simple_impl(std::shared_ptr<boost::fibers::barrier> barrier,Fn && function,Fns &&...functions)517 void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier,
518 Fn && function, Fns && ... functions) {
519 boost::fibers::fiber(
520 std::bind(
521 []( std::shared_ptr< boost::fibers::barrier > & barrier,
522 typename std::decay< Fn >::type & function) mutable {
523 function();
524 barrier->wait();
525 },
526 barrier,
527 std::forward< Fn >( function)
528 )).detach();
529 wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
530 }
531 //]
532
533 // interface function: instantiate barrier, launch tasks, wait for barrier
534 //[wait_all_simple
535 template< typename ... Fns >
wait_all_simple(Fns &&...functions)536 void wait_all_simple( Fns && ... functions) {
537 std::size_t count( sizeof ... ( functions) );
538 // Initialize a barrier(count+1) because we'll immediately wait on it. We
539 // don't want to wake up until 'count' more fibers wait on it. Even though
540 // we'll stick around until the last of them completes, use shared_ptr
541 // anyway because it's easier to be confident about lifespan issues.
542 auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) );
543 wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
544 barrier->wait();
545 }
546 //]
547
548 // example usage
__anon9c9826e52002()549 Example was( runner, "wait_all_simple()", [](){
550 //[wait_all_simple_ex
551 wait_all_simple(
552 [](){ sleeper("was_long", 150); },
553 [](){ sleeper("was_medium", 100); },
554 [](){ sleeper("was_short", 50); });
555 //]
556 });
557
558 /*****************************************************************************
559 * when_all, return values
560 *****************************************************************************/
561 //[wait_nchannel
562 // Introduce a channel facade that closes the channel once a specific number
563 // of items has been pushed. This allows an arbitrary consumer to read until
564 // 'closed' without itself having to count items.
565 template< typename T >
566 class nchannel {
567 public:
nchannel(std::shared_ptr<boost::fibers::buffered_channel<T>> chan,std::size_t lm)568 nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
569 std::size_t lm):
570 chan_( chan),
571 limit_( lm) {
572 assert(chan_);
573 if ( 0 == limit_) {
574 chan_->close();
575 }
576 }
577
push(T && va)578 boost::fibers::channel_op_status push( T && va) {
579 boost::fibers::channel_op_status ok =
580 chan_->push( std::forward< T >( va) );
581 if ( ok == boost::fibers::channel_op_status::success &&
582 --limit_ == 0) {
583 // after the 'limit_'th successful push, close the channel
584 chan_->close();
585 }
586 return ok;
587 }
588
589 private:
590 std::shared_ptr< boost::fibers::buffered_channel< T > > chan_;
591 std::size_t limit_;
592 };
593 //]
594
595 // When there's only one function, call this overload
596 //[wait_all_values_impl
597 template< typename T, typename Fn >
wait_all_values_impl(std::shared_ptr<nchannel<T>> chan,Fn && function)598 void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
599 Fn && function) {
600 boost::fibers::fiber( [chan, function](){
601 chan->push(function());
602 }).detach();
603 }
604 //]
605
606 // When there are two or more functions, call this overload
607 template< typename T, typename Fn0, typename Fn1, typename ... Fns >
wait_all_values_impl(std::shared_ptr<nchannel<T>> chan,Fn0 && function0,Fn1 && function1,Fns &&...functions)608 void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
609 Fn0 && function0,
610 Fn1 && function1,
611 Fns && ... functions) {
612 // process the first function using the single-function overload
613 wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) );
614 // then recur to process the rest
615 wait_all_values_impl< T >( chan,
616 std::forward< Fn1 >( function1),
617 std::forward< Fns >( functions) ... );
618 }
619
620 //[wait_all_values_source
621 // Return a shared_ptr<buffered_channel<T>> from which the caller can
622 // retrieve each new result as it arrives, until 'closed'.
623 template< typename Fn, typename ... Fns >
624 std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
wait_all_values_source(Fn && function,Fns &&...functions)625 wait_all_values_source( Fn && function, Fns && ... functions) {
626 std::size_t count( 1 + sizeof ... ( functions) );
627 typedef typename std::result_of< Fn() >::type return_t;
628 typedef boost::fibers::buffered_channel< return_t > channel_t;
629 // make the channel
630 auto chanp( std::make_shared< channel_t >( 64) );
631 // and make an nchannel facade to close it after 'count' items
632 auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
633 // pass that nchannel facade to all the relevant fibers
634 wait_all_values_impl< return_t >( ncp,
635 std::forward< Fn >( function),
636 std::forward< Fns >( functions) ... );
637 // then return the channel for consumer
638 return chanp;
639 }
640 //]
641
642 // When all passed functions have completed, return vector<T> containing
643 // collected results. Assume that all passed functions have the same return
644 // type. It is simply invalid to pass NO functions.
645 //[wait_all_values
646 template< typename Fn, typename ... Fns >
647 std::vector< typename std::result_of< Fn() >::type >
wait_all_values(Fn && function,Fns &&...functions)648 wait_all_values( Fn && function, Fns && ... functions) {
649 std::size_t count( 1 + sizeof ... ( functions) );
650 typedef typename std::result_of< Fn() >::type return_t;
651 typedef std::vector< return_t > vector_t;
652 vector_t results;
653 results.reserve( count);
654
655 // get channel
656 std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
657 wait_all_values_source( std::forward< Fn >( function),
658 std::forward< Fns >( functions) ... );
659 // fill results vector
660 return_t value;
661 while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
662 results.push_back( value);
663 }
664 // return vector to caller
665 return results;
666 }
667 //]
668
__anon9c9826e52502()669 Example wav( runner, "wait_all_values()", [](){
670 //[wait_all_values_source_ex
671 std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
672 wait_all_values_source(
673 [](){ return sleeper("wavs_third", 150); },
674 [](){ return sleeper("wavs_second", 100); },
675 [](){ return sleeper("wavs_first", 50); });
676 std::string value;
677 while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
678 std::cout << "wait_all_values_source() => '" << value
679 << "'" << std::endl;
680 }
681 //]
682
683 //[wait_all_values_ex
684 std::vector< std::string > values =
685 wait_all_values(
686 [](){ return sleeper("wav_late", 150); },
687 [](){ return sleeper("wav_middle", 100); },
688 [](){ return sleeper("wav_early", 50); });
689 //]
690 std::cout << "wait_all_values() =>";
691 for ( std::string const& v : values) {
692 std::cout << " '" << v << "'";
693 }
694 std::cout << std::endl;
695 });
696
697 /*****************************************************************************
698 * when_all, throw first exception
699 *****************************************************************************/
700 //[wait_all_until_error_source
701 // Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
702 // get() each new result as it arrives, until 'closed'.
703 template< typename Fn, typename ... Fns >
704 std::shared_ptr<
705 boost::fibers::buffered_channel<
706 boost::fibers::future<
707 typename std::result_of< Fn() >::type > > >
wait_all_until_error_source(Fn && function,Fns &&...functions)708 wait_all_until_error_source( Fn && function, Fns && ... functions) {
709 std::size_t count( 1 + sizeof ... ( functions) );
710 typedef typename std::result_of< Fn() >::type return_t;
711 typedef boost::fibers::future< return_t > future_t;
712 typedef boost::fibers::buffered_channel< future_t > channel_t;
713 // make the channel
714 auto chanp( std::make_shared< channel_t >( 64) );
715 // and make an nchannel facade to close it after 'count' items
716 auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) );
717 // pass that nchannel facade to all the relevant fibers
718 wait_first_outcome_impl< return_t >( ncp,
719 std::forward< Fn >( function),
720 std::forward< Fns >( functions) ... );
721 // then return the channel for consumer
722 return chanp;
723 }
724 //]
725
726 // When all passed functions have completed, return vector<T> containing
727 // collected results, or throw the first exception thrown by any of the passed
728 // functions. Assume that all passed functions have the same return type. It
729 // is simply invalid to pass NO functions.
730 //[wait_all_until_error
731 template< typename Fn, typename ... Fns >
732 std::vector< typename std::result_of< Fn() >::type >
wait_all_until_error(Fn && function,Fns &&...functions)733 wait_all_until_error( Fn && function, Fns && ... functions) {
734 std::size_t count( 1 + sizeof ... ( functions) );
735 typedef typename std::result_of< Fn() >::type return_t;
736 typedef typename boost::fibers::future< return_t > future_t;
737 typedef std::vector< return_t > vector_t;
738 vector_t results;
739 results.reserve( count);
740
741 // get channel
742 std::shared_ptr<
743 boost::fibers::buffered_channel< future_t > > chan(
744 wait_all_until_error_source( std::forward< Fn >( function),
745 std::forward< Fns >( functions) ... ) );
746 // fill results vector
747 future_t future;
748 while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
749 results.push_back( future.get() );
750 }
751 // return vector to caller
752 return results;
753 }
754 //]
755
__anon9c9826e52c02()756 Example waue( runner, "wait_all_until_error()", [](){
757 //[wait_all_until_error_source_ex
758 typedef boost::fibers::future< std::string > future_t;
759 std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan =
760 wait_all_until_error_source(
761 [](){ return sleeper("wauess_third", 150); },
762 [](){ return sleeper("wauess_second", 100); },
763 [](){ return sleeper("wauess_first", 50); });
764 future_t future;
765 while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
766 std::string value( future.get() );
767 std::cout << "wait_all_until_error_source(success) => '" << value
768 << "'" << std::endl;
769 }
770 //]
771
772 chan = wait_all_until_error_source(
773 [](){ return sleeper("wauesf_third", 150); },
774 [](){ return sleeper("wauesf_second", 100, true); },
775 [](){ return sleeper("wauesf_first", 50); });
776 //[wait_all_until_error_ex
777 std::string thrown;
778 //<-
779 try {
780 while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
781 std::string value( future.get() );
782 std::cout << "wait_all_until_error_source(fail) => '" << value
783 << "'" << std::endl;
784 }
785 } catch ( std::exception const& e) {
786 thrown = e.what();
787 }
788 std::cout << "wait_all_until_error_source(fail) threw '" << thrown
789 << "'" << std::endl;
790
791 thrown.clear();
792 //->
793 try {
794 std::vector< std::string > values = wait_all_until_error(
795 [](){ return sleeper("waue_late", 150); },
796 [](){ return sleeper("waue_middle", 100, true); },
797 [](){ return sleeper("waue_early", 50); });
798 //<-
799 std::cout << "wait_all_until_error(fail) =>";
800 for ( std::string const& v : values) {
801 std::cout << " '" << v << "'";
802 }
803 std::cout << std::endl;
804 //->
805 } catch ( std::exception const& e) {
806 thrown = e.what();
807 }
808 std::cout << "wait_all_until_error(fail) threw '" << thrown
809 << "'" << std::endl;
810 //]
811 });
812
813 /*****************************************************************************
814 * when_all, collect exceptions
815 *****************************************************************************/
816 // When all passed functions have succeeded, return vector<T> containing
817 // collected results, or throw exception_list containing all exceptions thrown
818 // by any of the passed functions. Assume that all passed functions have the
819 // same return type. It is simply invalid to pass NO functions.
820 //[wait_all_collect_errors
821 template< typename Fn, typename ... Fns >
822 std::vector< typename std::result_of< Fn() >::type >
wait_all_collect_errors(Fn && function,Fns &&...functions)823 wait_all_collect_errors( Fn && function, Fns && ... functions) {
824 std::size_t count( 1 + sizeof ... ( functions) );
825 typedef typename std::result_of< Fn() >::type return_t;
826 typedef typename boost::fibers::future< return_t > future_t;
827 typedef std::vector< return_t > vector_t;
828 vector_t results;
829 results.reserve( count);
830 exception_list exceptions("wait_all_collect_errors() exceptions");
831
832 // get channel
833 std::shared_ptr<
834 boost::fibers::buffered_channel< future_t > > chan(
835 wait_all_until_error_source( std::forward< Fn >( function),
836 std::forward< Fns >( functions) ... ) );
837 // fill results and/or exceptions vectors
838 future_t future;
839 while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
840 std::exception_ptr exp = future.get_exception_ptr();
841 if ( ! exp) {
842 results.push_back( future.get() );
843 } else {
844 exceptions.add( exp);
845 }
846 }
847 // if there were any exceptions, throw
848 if ( exceptions.size() ) {
849 throw exceptions;
850 }
851 // no exceptions: return vector to caller
852 return results;
853 }
854 //]
855
__anon9c9826e53602()856 Example wace( runner, "wait_all_collect_errors()", [](){
857 std::vector< std::string > values = wait_all_collect_errors(
858 [](){ return sleeper("waces_late", 150); },
859 [](){ return sleeper("waces_middle", 100); },
860 [](){ return sleeper("waces_early", 50); });
861 std::cout << "wait_all_collect_errors(success) =>";
862 for ( std::string const& v : values) {
863 std::cout << " '" << v << "'";
864 }
865 std::cout << std::endl;
866
867 std::string thrown;
868 std::size_t errors = 0;
869 try {
870 values = wait_all_collect_errors(
871 [](){ return sleeper("wacef_late", 150, true); },
872 [](){ return sleeper("wacef_middle", 100, true); },
873 [](){ return sleeper("wacef_early", 50); });
874 std::cout << "wait_all_collect_errors(fail) =>";
875 for ( std::string const& v : values) {
876 std::cout << " '" << v << "'";
877 }
878 std::cout << std::endl;
879 } catch ( exception_list const& e) {
880 thrown = e.what();
881 errors = e.size();
882 } catch ( std::exception const& e) {
883 thrown = e.what();
884 }
885 std::cout << "wait_all_collect_errors(fail) threw '" << thrown
886 << "': " << errors << " errors" << std::endl;
887 });
888
889 /*****************************************************************************
890 * when_all, heterogeneous
891 *****************************************************************************/
892 //[wait_all_members_get
893 template< typename Result, typename ... Futures >
wait_all_members_get(Futures &&...futures)894 Result wait_all_members_get( Futures && ... futures) {
895 // Fetch the results from the passed futures into Result's initializer
896 // list. It's true that the get() calls here will block the implicit
897 // iteration over futures -- but that doesn't matter because we won't be
898 // done until the slowest of them finishes anyway. As results are
899 // processed in argument-list order rather than order of completion, the
900 // leftmost get() to throw an exception will cause that exception to
901 // propagate to the caller.
902 return Result{ futures.get() ... };
903 }
904 //]
905
906 //[wait_all_members
907 // Explicitly pass Result. This can be any type capable of being initialized
908 // from the results of the passed functions, such as a struct.
909 template< typename Result, typename ... Fns >
wait_all_members(Fns &&...functions)910 Result wait_all_members( Fns && ... functions) {
911 // Run each of the passed functions on a separate fiber, passing all their
912 // futures to helper function for processing.
913 return wait_all_members_get< Result >(
914 boost::fibers::async( std::forward< Fns >( functions) ) ... );
915 }
916 //]
917
918 // used by following example
919 //[wait_Data
920 struct Data {
921 std::string str;
922 double inexact;
923 int exact;
924
operator <<(std::ostream & out,Data const & data)925 friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=;
926 ...*/
927 //<-
928 {
929 return out << "Data{str='" << data.str << "', inexact=" << data.inexact
930 << ", exact=" << data.exact << "}";
931 }
932 //->
933 };
934 //]
935
936 // example usage
__anon9c9826e53d02()937 Example wam( runner, "wait_all_members()", [](){
938 //[wait_all_members_data_ex
939 Data data = wait_all_members< Data >(
940 [](){ return sleeper("wams_left", 100); },
941 [](){ return sleeper(3.14, 150); },
942 [](){ return sleeper(17, 50); });
943 std::cout << "wait_all_members<Data>(success) => " << data << std::endl;
944 //]
945
946 std::string thrown;
947 try {
948 data = wait_all_members< Data >(
949 [](){ return sleeper("wamf_left", 100, true); },
950 [](){ return sleeper(3.14, 150); },
951 [](){ return sleeper(17, 50, true); });
952 std::cout << "wait_all_members<Data>(fail) => " << data << std::endl;
953 } catch ( std::exception const& e) {
954 thrown = e.what();
955 }
956 std::cout << "wait_all_members<Data>(fail) threw '" << thrown
957 << '"' << std::endl;
958
959 //[wait_all_members_vector_ex
960 // If we don't care about obtaining results as soon as they arrive, and we
961 // prefer a result vector in passed argument order rather than completion
962 // order, wait_all_members() is another possible implementation of
963 // wait_all_until_error().
964 auto strings = wait_all_members< std::vector< std::string > >(
965 [](){ return sleeper("wamv_left", 150); },
966 [](){ return sleeper("wamv_middle", 100); },
967 [](){ return sleeper("wamv_right", 50); });
968 std::cout << "wait_all_members<vector>() =>";
969 for ( std::string const& str : strings) {
970 std::cout << " '" << str << "'";
971 }
972 std::cout << std::endl;
973 //]
974 });
975
976
977 /*****************************************************************************
978 * main()
979 *****************************************************************************/
main(int argc,char * argv[])980 int main( int argc, char *argv[]) {
981 runner.run();
982 std::cout << "done." << std::endl;
983 return EXIT_SUCCESS;
984 }
985