• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1[/
2      Copyright Oliver Kowalke, Nat Goodspeed 2015.
3 Distributed under the Boost Software License, Version 1.0.
4    (See accompanying file LICENSE_1_0.txt or copy at
5          http://www.boost.org/LICENSE_1_0.txt
6]
7
8[/ import path is relative to this .qbk file]
9[import ../examples/wait_stuff.cpp]
10
11[#when_any]
12[section:when_any when_any / when_all functionality]
13
14[heading Overview]
15
16A bit of wisdom from the early days of computing still holds true today:
17prefer to model program state using the instruction pointer rather than with
18Boolean flags. In other words, if the program must ["do something] and then
19do something almost the same, but with minor changes... perhaps parts of that
20something should be broken out as smaller separate functions, rather than
21introducing flags to alter the internal behavior of a monolithic function.
22
23To that we would add: prefer to describe control flow using C++ native
24constructs such as function calls, `if`, `while`, `for`, `do` et al.
25rather than as chains of callbacks.
26
27One of the great strengths of __boost_fiber__ is the flexibility it confers on
28the coder to restructure an application from chains of callbacks to
29straightforward C++ statement sequence, even when code in that fiber is
30in fact interleaved with code running in other fibers.
31
32There has been much recent discussion about the benefits of when_any and
33when_all functionality. When dealing with asynchronous and possibly unreliable
34services, these are valuable idioms. But of course when_any and when_all are
35closely tied to the use of chains of callbacks.
36
37This section presents recipes for achieving the same ends, in the context of a
38fiber that wants to ["do something] when one or more other independent
39activities have completed. Accordingly, these are `wait_something()`
40functions rather than `when_something()` functions. The expectation is that
41the calling fiber asks to launch those independent activities, then waits for
42them, then sequentially proceeds with whatever processing depends on those
43results.
44
45The function names shown (e.g. [link wait_first_simple `wait_first_simple()`])
46are for illustrative purposes only, because all these functions have been
47bundled into a single source file. Presumably, if (say) [link
48wait_first_success `wait_first_success()`] best suits your application needs,
49you could introduce that variant with the name `wait_any()`.
50
51[note The functions presented in this section accept variadic argument lists
52of task functions. Corresponding `wait_something()` functions accepting a
53container of task functions are left as an exercise for the interested reader.
54Those should actually be simpler. Most of the complexity would arise from
55overloading the same name for both purposes.]
56
57[/ @path link is relative to (eventual) doc/html/index.html, hence ../..]
58All the source code for this section is found in
59[@../../examples/wait_stuff.cpp wait_stuff.cpp].
60
61[heading Example Task Function]
62
63[#wait_sleeper]
64We found it convenient to model an asynchronous task using this function:
65
66[wait_sleeper]
67
68with type-specific `sleeper()` ["front ends] for `std::string`, `double` and
69`int`.
70
71`Verbose` simply prints a message to `std::cout` on construction and
72destruction.
73
74Basically:
75
76# `sleeper()` prints a start message;
77# sleeps for the specified number of milliseconds;
78# if `thrw` is passed as `true`, throws a string description of the passed
79  `item`;
80# else returns the passed `item`.
81# On the way out, `sleeper()` produces a stop message.
82
83This function will feature in the example calls to the various functions
84presented below.
85
86[section when_any]
87[#wait_first_simple_section]
88[section when_any, simple completion]
89
90The simplest case is when you only need to know that the first of a set of
91asynchronous tasks has completed [mdash] but you don't need to obtain a return
92value, and you're confident that they will not throw exceptions.
93
94[#wait_done]
95For this we introduce a `Done` class to wrap a `bool` variable with a
96[class_link condition_variable] and a [class_link mutex]:
97
98[wait_done]
99
100The pattern we follow throughout this section is to pass a
101[@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`]
102to the relevant synchronization object to the various tasks' fiber functions.
103This eliminates nagging questions about the lifespan of the synchronization
104object relative to the last of the fibers.
105
106[#wait_first_simple]
107`wait_first_simple()` uses that tactic for [link wait_done `Done`]:
108
109[wait_first_simple]
110
111[#wait_first_simple_impl]
112`wait_first_simple_impl()` is an ordinary recursion over the argument pack,
113capturing `Done::ptr` for each new fiber:
114
115[wait_first_simple_impl]
116
117The body of the fiber's lambda is extremely simple, as promised: call the
118function, notify [link wait_done `Done`] when it returns. The first fiber to
119do so allows `wait_first_simple()` to return [mdash] which is why it's useful
120to have `std::shared_ptr<Done>` manage the lifespan of our `Done` object
121rather than declaring it as a stack variable in `wait_first_simple()`.
122
123This is how you might call it:
124
125[wait_first_simple_ex]
126
127In this example, control resumes after `wait_first_simple()` when [link
128wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the
129other two `sleeper()` fibers are still running.
130
131[endsect]
132[section when_any, return value]
133
134It seems more useful to add the ability to capture the return value from the
135first of the task functions to complete. Again, we assume that none will throw
136an exception.
137
138One tactic would be to adapt our [link wait_done `Done`] class to store the
139first of the return values, rather than a simple `bool`. However, we choose
140instead to use a [template_link buffered_channel]. We'll only need to enqueue
141the first value, so we'll [member_link buffered_channel..close] it once we've
142retrieved that value. Subsequent `push()` calls will return `closed`.
143
144[#wait_first_value]
145[wait_first_value]
146
147[#wait_first_value_impl]
148The meat of the `wait_first_value_impl()` function is as you might expect:
149
150[wait_first_value_impl]
151
152It calls the passed function, pushes its return value and ignores the `push()`
153result. You might call it like this:
154
155[wait_first_value_ex]
156
157[endsect]
158[section when_any, produce first outcome, whether result or exception]
159
160We may not be running in an environment in which we can guarantee no exception
161will be thrown by any of our task functions. In that case, the above
162implementations of `wait_first_something()` would be naïve: as mentioned in
163[link exceptions the section on Fiber Management], an uncaught exception in one
164of our task fibers would cause `std::terminate()` to be called.
165
166Let's at least ensure that such an exception would propagate to the fiber
167awaiting the first result. We can use [template_link future] to transport
168either a return value or an exception. Therefore, we will change [link
169wait_first_value `wait_first_value()`]'s [template_link buffered_channel] to
170hold `future< T >` items instead of simply `T`.
171
172Once we have a `future<>` in hand, all we need do is call [member_link
173future..get], which will either return the value or rethrow the exception.
174
175[#wait_first_outcome]
176[wait_first_outcome]
177
178So far so good [mdash] but there's a timing issue. How should we obtain the
179`future<>` to [member_link buffered_channel..push] on the queue?
180
181We could call [ns_function_link fibers..async]. That would certainly produce a
182`future<>` for the task function. The trouble is that it would return too
183quickly! We only want `future<>` items for ['completed] tasks on our
184`queue<>`. In fact, we only want the `future<>` for the one that
185completes first. If each fiber launched by `wait_first_outcome()` were to
186`push()` the result of calling `async()`, the queue would only ever report
187the result of the leftmost task item [mdash] ['not] the one that completes most
188quickly.
189
190Calling [member_link future..get] on the future returned by `async()` wouldn't
191be right. You can only call `get()` once per `future<>` instance! And if there
192were an exception, it would be rethrown inside the helper fiber at the
193producer end of the queue, rather than propagated to the consumer end.
194
195We could call [member_link future..wait]. That would block the helper fiber
196until the `future<>` became ready, at which point we could `push()` it to be
197retrieved by `wait_first_outcome()`.
198
199That would work [mdash] but there's a simpler tactic that avoids creating an extra
200fiber. We can wrap the task function in a [template_link packaged_task]. While
201one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is,
202in fact, what `async()` does [mdash] in this case, we're already running in the
203helper fiber at the producer end of the queue! We can simply ['call] the
204`packaged_task<>`. On return from that call, the task function has completed,
205meaning that the `future<>` obtained from the `packaged_task<>` is certain to
206be ready. At that point we can simply `push()` it to the queue.
207
208[#wait_first_outcome_impl]
209[wait_first_outcome_impl]
210
211Calling it might look like this:
212
213[wait_first_outcome_ex]
214
215[endsect]
216[section when_any, produce first success]
217
218One scenario for ["when_any] functionality is when we're redundantly contacting
219some number of possibly-unreliable web services. Not only might they be slow
220[mdash] any one of them might produce a failure rather than the desired
221result.
222
223In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the
224right approach. If one of the services produces an error quickly, while
225another follows up with a real answer, we don't want to prefer the error just
226because it arrived first!
227
228Given the `queue< future< T > >` we already constructed for
229`wait_first_outcome()`, though, we can readily recast the interface function
230to deliver the first ['successful] result.
231
232That does beg the question: what if ['all] the task functions throw an
233exception? In that case we'd probably better know about it.
234
235[#exception_list]
236The
237[@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis
238C++ Parallelism Draft Technical Specification] proposes a
239`std::exception_list` exception capable of delivering a collection of
240`std::exception_ptr`s. Until that becomes universally available, let's fake up
241an `exception_list` of our own:
242
243[exception_list]
244
245Now we can build `wait_first_success()`, using [link wait_first_outcome_impl
246`wait_first_outcome_impl()`].
247
248Instead of retrieving only the first `future<>` from the queue, we must now
249loop over `future<>` items. Of course we must limit that iteration! If we
250launch only `count` producer fibers, the `(count+1)`[superscript st]
251[member_link buffered_channel..pop] call would block forever.
252
253Given a ready `future<>`, we can distinguish failure by calling [member_link
254future..get_exception_ptr]. If the `future<>` in fact contains a result rather
255than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we
256can confidently call [member_link future..get] to return that result to our
257caller.
258
259If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into
260our pending `exception_list` and loop back for the next `future<>` from the
261queue.
262
263If we fall out of the loop [mdash] if every single task fiber threw an
264exception [mdash] we throw the `exception_list` exception into which we've
265been collecting those `std::exception_ptr`s.
266
267[#wait_first_success]
268[wait_first_success]
269
270A call might look like this:
271
272[wait_first_success_ex]
273
274[endsect]
275[section when_any, heterogeneous types]
276
277We would be remiss to ignore the case in which the various task functions have
278distinct return types. That means that the value returned by the first of them
279might have any one of those types. We can express that with
280[@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant].
281
282To keep the example simple, we'll revert to pretending that none of them can
283throw an exception. That makes `wait_first_value_het()` strongly resemble
284[link wait_first_value `wait_first_value()`]. We can actually reuse [link
285wait_first_value_impl `wait_first_value_impl()`], merely passing
286`boost::variant<T0, T1, ...>` as the queue's value type rather than the
287common `T`!
288
289Naturally this could be extended to use [link wait_first_success
290`wait_first_success()`] semantics instead.
291
292[wait_first_value_het]
293
294It might be called like this:
295
296[wait_first_value_het_ex]
297
298[endsect]
299[section when_any, a dubious alternative]
300
301Certain topics in C++ can arouse strong passions, and exceptions are no
302exception. We cannot resist mentioning [mdash] for purely informational
303purposes [mdash] that when you need only the ['first] result from some number
304of concurrently-running fibers, it would be possible to pass a
305[^shared_ptr<[template_link promise]>] to the participating fibers, then cause
306the initiating fiber to call [member_link future..get] on its [template_link
307future]. The first fiber to call [member_link promise..set_value] on that
308shared `promise` will succeed; subsequent `set_value()` calls on the same
309`promise` instance will throw `future_error`.
310
311Use this information at your own discretion. Beware the dark side.
312
313[endsect]
314[endsect][/ when_any]
315
316[section when_all functionality]
317[section when_all, simple completion]
318
319For the case in which we must wait for ['all] task functions to complete
320[mdash] but we don't need results (or expect exceptions) from any of them
321[mdash] we can write `wait_all_simple()` that looks remarkably like [link
322wait_first_simple `wait_first_simple()`]. The difference is that instead of
323our [link wait_done `Done`] class, we instantiate a [class_link barrier] and
324call its [member_link barrier..wait].
325
326We initialize the `barrier` with `(count+1)` because we are launching `count`
327fibers, plus the `wait()` call within `wait_all_simple()` itself.
328
329[wait_all_simple]
330
331As stated above, the only difference between `wait_all_simple_impl()` and
332[link wait_first_simple_impl `wait_first_simple_impl()`] is that the former
333calls `barrier::wait()` rather than `Done::notify()`:
334
335[wait_all_simple_impl]
336
337You might call it like this:
338
339[wait_all_simple_ex]
340
341Control will not return from the `wait_all_simple()` call until the last of
342its task functions has completed.
343
344[endsect]
345[section when_all, return values]
346
347As soon as we want to collect return values from all the task functions, we
348can see right away how to reuse [link wait_first_value `wait_first_value()`]'s
349queue<T> for the purpose. All we have to do is avoid closing it after the
350first value!
351
352But in fact, collecting multiple values raises an interesting question: do we
353['really] want to wait until the slowest of them has arrived? Wouldn't we
354rather process each result as soon as it becomes available?
355
356Fortunately we can present both APIs. Let's define `wait_all_values_source()`
357to return `shared_ptr<buffered_channel<T>>`.
358
359[#wait_all_values]
360Given `wait_all_values_source()`, it's straightforward to implement
361`wait_all_values()`:
362
363[wait_all_values]
364
365It might be called like this:
366
367[wait_all_values_ex]
368
369As you can see from the loop in `wait_all_values()`, instead of requiring its
370caller to count values, we define `wait_all_values_source()` to [member_link
371buffered_channel..close] the queue when done. But how do we do that? Each
372producer fiber is independent. It has no idea whether it is the last one to
373[member_link buffered_channel..push] a value.
374
375[#wait_nqueue]
376We can address that problem with a counting façade for the
377`queue<>`. In fact, our façade need only support the producer end of
378the queue.
379
380[wait_nqueue]
381
382[#wait_all_values_source]
383Armed with `nqueue<>`, we can implement `wait_all_values_source()`. It
384starts just like [link wait_first_value `wait_first_value()`]. The difference
385is that we wrap the `queue<T>` with an `nqueue<T>` to pass to
386the producer fibers.
387
388Then, of course, instead of popping the first value, closing the queue and
389returning it, we simply return the `shared_ptr<queue<T>>`.
390
391[wait_all_values_source]
392
393For example:
394
395[wait_all_values_source_ex]
396
397[#wait_all_values_impl]
398`wait_all_values_impl()` really is just like [link wait_first_value_impl
399`wait_first_value_impl()`] except for the use of `nqueue<T>` rather than
400`queue<T>`:
401
402[wait_all_values_impl]
403
404[endsect]
405[section when_all until first exception]
406
407Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we
408can elaborate [link wait_all_values `wait_all_values()`] and [link
409wait_all_values_source `wait_all_values_source()`] by passing `future< T >`
410instead of plain `T`.
411
412[#wait_all_until_error]
413`wait_all_until_error()` pops that `future< T >` and calls its [member_link
414future..get]:
415
416[wait_all_until_error]
417
418For example:
419
420[wait_all_until_error_ex]
421
422[#wait_all_until_error_source]
423Naturally this complicates the API for `wait_all_until_error_source()`. The
424caller must both retrieve a `future< T >` and call its `get()` method. It would,
425of course, be possible to return a façade over the consumer end of the
426queue that would implicitly perform the `get()` and return a simple `T` (or
427throw).
428
429The implementation is just as you would expect. Notice, however, that we can
430reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the
431`nqueue<T>` rather than `queue<T>`.
432
433[wait_all_until_error_source]
434
435For example:
436
437[wait_all_until_error_source_ex]
438
439[endsect]
440[section wait_all, collecting all exceptions]
441
442[#wait_all_collect_errors]
443Given [link wait_all_until_error_source `wait_all_until_error_source()`], it
444might be more reasonable to make a `wait_all_...()` that collects ['all]
445errors instead of presenting only the first:
446
447[wait_all_collect_errors]
448
449The implementation is a simple variation on [link wait_first_success
450`wait_first_success()`], using the same [link exception_list `exception_list`]
451exception class.
452
453[endsect]
454[section when_all, heterogeneous types]
455
456But what about the case when we must wait for all results of different types?
457
458We can present an API that is frankly quite cool. Consider a sample struct:
459
460[wait_Data]
461
462Let's fill its members from task functions all running concurrently:
463
464[wait_all_members_data_ex]
465
466Note that for this case, we abandon the notion of capturing the earliest
467result first, and so on: we must fill exactly the passed struct in
468left-to-right order.
469
470That permits a beautifully simple implementation:
471
472[wait_all_members]
473
474[wait_all_members_get]
475
476It is tempting to try to implement `wait_all_members()` as a one-liner like
477this:
478
479    return Result{ boost::fibers::async(functions).get()... };
480
481The trouble with this tactic is that it would serialize all the task
482functions. The runtime makes a single pass through `functions`, calling
483[ns_function_link fibers..async] for each and then immediately calling
484[member_link future..get] on its returned `future<>`. That blocks the implicit
485loop. The above is almost equivalent to writing:
486
487    return Result{ functions()... };
488
489in which, of course, there is no concurrency at all.
490
491Passing the argument pack through a function-call boundary
492(`wait_all_members_get()`) forces the runtime to make ['two] passes: one in
493`wait_all_members()` to collect the `future<>`s from all the `async()` calls,
494the second in `wait_all_members_get()` to fetch each of the results.
495
496As noted in comments, within the `wait_all_members_get()` parameter pack
497expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the
498way, we will hit the `get()` for the slowest task function; after that every
499subsequent `get()` will complete in trivial time.
500
501By the way, we could also use this same API to fill a vector or other
502collection:
503
504[wait_all_members_vector_ex]
505
506[endsect]
507[endsect][/ when_all]
508
509[endsect][/ outermost]
510