1[/ 2 Copyright Oliver Kowalke, Nat Goodspeed 2015. 3 Distributed under the Boost Software License, Version 1.0. 4 (See accompanying file LICENSE_1_0.txt or copy at 5 http://www.boost.org/LICENSE_1_0.txt 6] 7 8[/ import path is relative to this .qbk file] 9[import ../examples/wait_stuff.cpp] 10 11[#when_any] 12[section:when_any when_any / when_all functionality] 13 14[heading Overview] 15 16A bit of wisdom from the early days of computing still holds true today: 17prefer to model program state using the instruction pointer rather than with 18Boolean flags. In other words, if the program must ["do something] and then 19do something almost the same, but with minor changes... perhaps parts of that 20something should be broken out as smaller separate functions, rather than 21introducing flags to alter the internal behavior of a monolithic function. 22 23To that we would add: prefer to describe control flow using C++ native 24constructs such as function calls, `if`, `while`, `for`, `do` et al. 25rather than as chains of callbacks. 26 27One of the great strengths of __boost_fiber__ is the flexibility it confers on 28the coder to restructure an application from chains of callbacks to 29straightforward C++ statement sequence, even when code in that fiber is 30in fact interleaved with code running in other fibers. 31 32There has been much recent discussion about the benefits of when_any and 33when_all functionality. When dealing with asynchronous and possibly unreliable 34services, these are valuable idioms. But of course when_any and when_all are 35closely tied to the use of chains of callbacks. 36 37This section presents recipes for achieving the same ends, in the context of a 38fiber that wants to ["do something] when one or more other independent 39activities have completed. Accordingly, these are `wait_something()` 40functions rather than `when_something()` functions. The expectation is that 41the calling fiber asks to launch those independent activities, then waits for 42them, then sequentially proceeds with whatever processing depends on those 43results. 44 45The function names shown (e.g. [link wait_first_simple `wait_first_simple()`]) 46are for illustrative purposes only, because all these functions have been 47bundled into a single source file. Presumably, if (say) [link 48wait_first_success `wait_first_success()`] best suits your application needs, 49you could introduce that variant with the name `wait_any()`. 50 51[note The functions presented in this section accept variadic argument lists 52of task functions. Corresponding `wait_something()` functions accepting a 53container of task functions are left as an exercise for the interested reader. 54Those should actually be simpler. Most of the complexity would arise from 55overloading the same name for both purposes.] 56 57[/ @path link is relative to (eventual) doc/html/index.html, hence ../..] 58All the source code for this section is found in 59[@../../examples/wait_stuff.cpp wait_stuff.cpp]. 60 61[heading Example Task Function] 62 63[#wait_sleeper] 64We found it convenient to model an asynchronous task using this function: 65 66[wait_sleeper] 67 68with type-specific `sleeper()` ["front ends] for `std::string`, `double` and 69`int`. 70 71`Verbose` simply prints a message to `std::cout` on construction and 72destruction. 73 74Basically: 75 76# `sleeper()` prints a start message; 77# sleeps for the specified number of milliseconds; 78# if `thrw` is passed as `true`, throws a string description of the passed 79 `item`; 80# else returns the passed `item`. 81# On the way out, `sleeper()` produces a stop message. 82 83This function will feature in the example calls to the various functions 84presented below. 85 86[section when_any] 87[#wait_first_simple_section] 88[section when_any, simple completion] 89 90The simplest case is when you only need to know that the first of a set of 91asynchronous tasks has completed [mdash] but you don't need to obtain a return 92value, and you're confident that they will not throw exceptions. 93 94[#wait_done] 95For this we introduce a `Done` class to wrap a `bool` variable with a 96[class_link condition_variable] and a [class_link mutex]: 97 98[wait_done] 99 100The pattern we follow throughout this section is to pass a 101[@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`] 102to the relevant synchronization object to the various tasks' fiber functions. 103This eliminates nagging questions about the lifespan of the synchronization 104object relative to the last of the fibers. 105 106[#wait_first_simple] 107`wait_first_simple()` uses that tactic for [link wait_done `Done`]: 108 109[wait_first_simple] 110 111[#wait_first_simple_impl] 112`wait_first_simple_impl()` is an ordinary recursion over the argument pack, 113capturing `Done::ptr` for each new fiber: 114 115[wait_first_simple_impl] 116 117The body of the fiber's lambda is extremely simple, as promised: call the 118function, notify [link wait_done `Done`] when it returns. The first fiber to 119do so allows `wait_first_simple()` to return [mdash] which is why it's useful 120to have `std::shared_ptr<Done>` manage the lifespan of our `Done` object 121rather than declaring it as a stack variable in `wait_first_simple()`. 122 123This is how you might call it: 124 125[wait_first_simple_ex] 126 127In this example, control resumes after `wait_first_simple()` when [link 128wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the 129other two `sleeper()` fibers are still running. 130 131[endsect] 132[section when_any, return value] 133 134It seems more useful to add the ability to capture the return value from the 135first of the task functions to complete. Again, we assume that none will throw 136an exception. 137 138One tactic would be to adapt our [link wait_done `Done`] class to store the 139first of the return values, rather than a simple `bool`. However, we choose 140instead to use a [template_link buffered_channel]. We'll only need to enqueue 141the first value, so we'll [member_link buffered_channel..close] it once we've 142retrieved that value. Subsequent `push()` calls will return `closed`. 143 144[#wait_first_value] 145[wait_first_value] 146 147[#wait_first_value_impl] 148The meat of the `wait_first_value_impl()` function is as you might expect: 149 150[wait_first_value_impl] 151 152It calls the passed function, pushes its return value and ignores the `push()` 153result. You might call it like this: 154 155[wait_first_value_ex] 156 157[endsect] 158[section when_any, produce first outcome, whether result or exception] 159 160We may not be running in an environment in which we can guarantee no exception 161will be thrown by any of our task functions. In that case, the above 162implementations of `wait_first_something()` would be naïve: as mentioned in 163[link exceptions the section on Fiber Management], an uncaught exception in one 164of our task fibers would cause `std::terminate()` to be called. 165 166Let's at least ensure that such an exception would propagate to the fiber 167awaiting the first result. We can use [template_link future] to transport 168either a return value or an exception. Therefore, we will change [link 169wait_first_value `wait_first_value()`]'s [template_link buffered_channel] to 170hold `future< T >` items instead of simply `T`. 171 172Once we have a `future<>` in hand, all we need do is call [member_link 173future..get], which will either return the value or rethrow the exception. 174 175[#wait_first_outcome] 176[wait_first_outcome] 177 178So far so good [mdash] but there's a timing issue. How should we obtain the 179`future<>` to [member_link buffered_channel..push] on the queue? 180 181We could call [ns_function_link fibers..async]. That would certainly produce a 182`future<>` for the task function. The trouble is that it would return too 183quickly! We only want `future<>` items for ['completed] tasks on our 184`queue<>`. In fact, we only want the `future<>` for the one that 185completes first. If each fiber launched by `wait_first_outcome()` were to 186`push()` the result of calling `async()`, the queue would only ever report 187the result of the leftmost task item [mdash] ['not] the one that completes most 188quickly. 189 190Calling [member_link future..get] on the future returned by `async()` wouldn't 191be right. You can only call `get()` once per `future<>` instance! And if there 192were an exception, it would be rethrown inside the helper fiber at the 193producer end of the queue, rather than propagated to the consumer end. 194 195We could call [member_link future..wait]. That would block the helper fiber 196until the `future<>` became ready, at which point we could `push()` it to be 197retrieved by `wait_first_outcome()`. 198 199That would work [mdash] but there's a simpler tactic that avoids creating an extra 200fiber. We can wrap the task function in a [template_link packaged_task]. While 201one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is, 202in fact, what `async()` does [mdash] in this case, we're already running in the 203helper fiber at the producer end of the queue! We can simply ['call] the 204`packaged_task<>`. On return from that call, the task function has completed, 205meaning that the `future<>` obtained from the `packaged_task<>` is certain to 206be ready. At that point we can simply `push()` it to the queue. 207 208[#wait_first_outcome_impl] 209[wait_first_outcome_impl] 210 211Calling it might look like this: 212 213[wait_first_outcome_ex] 214 215[endsect] 216[section when_any, produce first success] 217 218One scenario for ["when_any] functionality is when we're redundantly contacting 219some number of possibly-unreliable web services. Not only might they be slow 220[mdash] any one of them might produce a failure rather than the desired 221result. 222 223In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the 224right approach. If one of the services produces an error quickly, while 225another follows up with a real answer, we don't want to prefer the error just 226because it arrived first! 227 228Given the `queue< future< T > >` we already constructed for 229`wait_first_outcome()`, though, we can readily recast the interface function 230to deliver the first ['successful] result. 231 232That does beg the question: what if ['all] the task functions throw an 233exception? In that case we'd probably better know about it. 234 235[#exception_list] 236The 237[@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis 238C++ Parallelism Draft Technical Specification] proposes a 239`std::exception_list` exception capable of delivering a collection of 240`std::exception_ptr`s. Until that becomes universally available, let's fake up 241an `exception_list` of our own: 242 243[exception_list] 244 245Now we can build `wait_first_success()`, using [link wait_first_outcome_impl 246`wait_first_outcome_impl()`]. 247 248Instead of retrieving only the first `future<>` from the queue, we must now 249loop over `future<>` items. Of course we must limit that iteration! If we 250launch only `count` producer fibers, the `(count+1)`[superscript st] 251[member_link buffered_channel..pop] call would block forever. 252 253Given a ready `future<>`, we can distinguish failure by calling [member_link 254future..get_exception_ptr]. If the `future<>` in fact contains a result rather 255than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we 256can confidently call [member_link future..get] to return that result to our 257caller. 258 259If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into 260our pending `exception_list` and loop back for the next `future<>` from the 261queue. 262 263If we fall out of the loop [mdash] if every single task fiber threw an 264exception [mdash] we throw the `exception_list` exception into which we've 265been collecting those `std::exception_ptr`s. 266 267[#wait_first_success] 268[wait_first_success] 269 270A call might look like this: 271 272[wait_first_success_ex] 273 274[endsect] 275[section when_any, heterogeneous types] 276 277We would be remiss to ignore the case in which the various task functions have 278distinct return types. That means that the value returned by the first of them 279might have any one of those types. We can express that with 280[@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant]. 281 282To keep the example simple, we'll revert to pretending that none of them can 283throw an exception. That makes `wait_first_value_het()` strongly resemble 284[link wait_first_value `wait_first_value()`]. We can actually reuse [link 285wait_first_value_impl `wait_first_value_impl()`], merely passing 286`boost::variant<T0, T1, ...>` as the queue's value type rather than the 287common `T`! 288 289Naturally this could be extended to use [link wait_first_success 290`wait_first_success()`] semantics instead. 291 292[wait_first_value_het] 293 294It might be called like this: 295 296[wait_first_value_het_ex] 297 298[endsect] 299[section when_any, a dubious alternative] 300 301Certain topics in C++ can arouse strong passions, and exceptions are no 302exception. We cannot resist mentioning [mdash] for purely informational 303purposes [mdash] that when you need only the ['first] result from some number 304of concurrently-running fibers, it would be possible to pass a 305[^shared_ptr<[template_link promise]>] to the participating fibers, then cause 306the initiating fiber to call [member_link future..get] on its [template_link 307future]. The first fiber to call [member_link promise..set_value] on that 308shared `promise` will succeed; subsequent `set_value()` calls on the same 309`promise` instance will throw `future_error`. 310 311Use this information at your own discretion. Beware the dark side. 312 313[endsect] 314[endsect][/ when_any] 315 316[section when_all functionality] 317[section when_all, simple completion] 318 319For the case in which we must wait for ['all] task functions to complete 320[mdash] but we don't need results (or expect exceptions) from any of them 321[mdash] we can write `wait_all_simple()` that looks remarkably like [link 322wait_first_simple `wait_first_simple()`]. The difference is that instead of 323our [link wait_done `Done`] class, we instantiate a [class_link barrier] and 324call its [member_link barrier..wait]. 325 326We initialize the `barrier` with `(count+1)` because we are launching `count` 327fibers, plus the `wait()` call within `wait_all_simple()` itself. 328 329[wait_all_simple] 330 331As stated above, the only difference between `wait_all_simple_impl()` and 332[link wait_first_simple_impl `wait_first_simple_impl()`] is that the former 333calls `barrier::wait()` rather than `Done::notify()`: 334 335[wait_all_simple_impl] 336 337You might call it like this: 338 339[wait_all_simple_ex] 340 341Control will not return from the `wait_all_simple()` call until the last of 342its task functions has completed. 343 344[endsect] 345[section when_all, return values] 346 347As soon as we want to collect return values from all the task functions, we 348can see right away how to reuse [link wait_first_value `wait_first_value()`]'s 349queue<T> for the purpose. All we have to do is avoid closing it after the 350first value! 351 352But in fact, collecting multiple values raises an interesting question: do we 353['really] want to wait until the slowest of them has arrived? Wouldn't we 354rather process each result as soon as it becomes available? 355 356Fortunately we can present both APIs. Let's define `wait_all_values_source()` 357to return `shared_ptr<buffered_channel<T>>`. 358 359[#wait_all_values] 360Given `wait_all_values_source()`, it's straightforward to implement 361`wait_all_values()`: 362 363[wait_all_values] 364 365It might be called like this: 366 367[wait_all_values_ex] 368 369As you can see from the loop in `wait_all_values()`, instead of requiring its 370caller to count values, we define `wait_all_values_source()` to [member_link 371buffered_channel..close] the queue when done. But how do we do that? Each 372producer fiber is independent. It has no idea whether it is the last one to 373[member_link buffered_channel..push] a value. 374 375[#wait_nqueue] 376We can address that problem with a counting façade for the 377`queue<>`. In fact, our façade need only support the producer end of 378the queue. 379 380[wait_nqueue] 381 382[#wait_all_values_source] 383Armed with `nqueue<>`, we can implement `wait_all_values_source()`. It 384starts just like [link wait_first_value `wait_first_value()`]. The difference 385is that we wrap the `queue<T>` with an `nqueue<T>` to pass to 386the producer fibers. 387 388Then, of course, instead of popping the first value, closing the queue and 389returning it, we simply return the `shared_ptr<queue<T>>`. 390 391[wait_all_values_source] 392 393For example: 394 395[wait_all_values_source_ex] 396 397[#wait_all_values_impl] 398`wait_all_values_impl()` really is just like [link wait_first_value_impl 399`wait_first_value_impl()`] except for the use of `nqueue<T>` rather than 400`queue<T>`: 401 402[wait_all_values_impl] 403 404[endsect] 405[section when_all until first exception] 406 407Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we 408can elaborate [link wait_all_values `wait_all_values()`] and [link 409wait_all_values_source `wait_all_values_source()`] by passing `future< T >` 410instead of plain `T`. 411 412[#wait_all_until_error] 413`wait_all_until_error()` pops that `future< T >` and calls its [member_link 414future..get]: 415 416[wait_all_until_error] 417 418For example: 419 420[wait_all_until_error_ex] 421 422[#wait_all_until_error_source] 423Naturally this complicates the API for `wait_all_until_error_source()`. The 424caller must both retrieve a `future< T >` and call its `get()` method. It would, 425of course, be possible to return a façade over the consumer end of the 426queue that would implicitly perform the `get()` and return a simple `T` (or 427throw). 428 429The implementation is just as you would expect. Notice, however, that we can 430reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the 431`nqueue<T>` rather than `queue<T>`. 432 433[wait_all_until_error_source] 434 435For example: 436 437[wait_all_until_error_source_ex] 438 439[endsect] 440[section wait_all, collecting all exceptions] 441 442[#wait_all_collect_errors] 443Given [link wait_all_until_error_source `wait_all_until_error_source()`], it 444might be more reasonable to make a `wait_all_...()` that collects ['all] 445errors instead of presenting only the first: 446 447[wait_all_collect_errors] 448 449The implementation is a simple variation on [link wait_first_success 450`wait_first_success()`], using the same [link exception_list `exception_list`] 451exception class. 452 453[endsect] 454[section when_all, heterogeneous types] 455 456But what about the case when we must wait for all results of different types? 457 458We can present an API that is frankly quite cool. Consider a sample struct: 459 460[wait_Data] 461 462Let's fill its members from task functions all running concurrently: 463 464[wait_all_members_data_ex] 465 466Note that for this case, we abandon the notion of capturing the earliest 467result first, and so on: we must fill exactly the passed struct in 468left-to-right order. 469 470That permits a beautifully simple implementation: 471 472[wait_all_members] 473 474[wait_all_members_get] 475 476It is tempting to try to implement `wait_all_members()` as a one-liner like 477this: 478 479 return Result{ boost::fibers::async(functions).get()... }; 480 481The trouble with this tactic is that it would serialize all the task 482functions. The runtime makes a single pass through `functions`, calling 483[ns_function_link fibers..async] for each and then immediately calling 484[member_link future..get] on its returned `future<>`. That blocks the implicit 485loop. The above is almost equivalent to writing: 486 487 return Result{ functions()... }; 488 489in which, of course, there is no concurrency at all. 490 491Passing the argument pack through a function-call boundary 492(`wait_all_members_get()`) forces the runtime to make ['two] passes: one in 493`wait_all_members()` to collect the `future<>`s from all the `async()` calls, 494the second in `wait_all_members_get()` to fetch each of the results. 495 496As noted in comments, within the `wait_all_members_get()` parameter pack 497expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the 498way, we will hit the `get()` for the slowest task function; after that every 499subsequent `get()` will complete in trivial time. 500 501By the way, we could also use this same API to fill a vector or other 502collection: 503 504[wait_all_members_vector_ex] 505 506[endsect] 507[endsect][/ when_all] 508 509[endsect][/ outermost] 510