• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //          Copyright Nat Goodspeed + Oliver Kowalke 2015.
2 // Distributed under the Boost Software License, Version 1.0.
3 //    (See accompanying file LICENSE_1_0.txt or copy at
4 //          http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <chrono>
7 #include <condition_variable>
8 #include <cstddef>
9 #include <deque>
10 #include <iomanip>
11 #include <iostream>
12 #include <mutex>
13 #include <sstream>
14 #include <string>
15 #include <thread>
16 
17 #include <boost/assert.hpp>
18 
19 #include <boost/fiber/all.hpp>
20 
21 #include <boost/fiber/detail/thread_barrier.hpp>
22 
23 static std::size_t fiber_count{ 0 };
24 static std::mutex mtx_count{};
25 static boost::fibers::condition_variable_any cnd_count{};
26 typedef std::unique_lock< std::mutex > lock_type;
27 
28 /*****************************************************************************
29 *   example fiber function
30 *****************************************************************************/
31 //[fiber_fn_ws
whatevah(char me)32 void whatevah( char me) {
33     try {
34         std::thread::id my_thread = std::this_thread::get_id(); /*< get ID of initial thread >*/
35         {
36             std::ostringstream buffer;
37             buffer << "fiber " << me << " started on thread " << my_thread << '\n';
38             std::cout << buffer.str() << std::flush;
39         }
40         for ( unsigned i = 0; i < 10; ++i) { /*< loop ten times >*/
41             boost::this_fiber::yield(); /*< yield to other fibers >*/
42             std::thread::id new_thread = std::this_thread::get_id(); /*< get ID of current thread >*/
43             if ( new_thread != my_thread) { /*< test if fiber was migrated to another thread >*/
44                 my_thread = new_thread;
45                 std::ostringstream buffer;
46                 buffer << "fiber " << me << " switched to thread " << my_thread << '\n';
47                 std::cout << buffer.str() << std::flush;
48             }
49         }
50     } catch ( ... ) {
51     }
52     lock_type lk( mtx_count);
53     if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
54         lk.unlock();
55         cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
56     }
57 }
58 //]
59 
60 /*****************************************************************************
61 *   example thread function
62 *****************************************************************************/
63 //[thread_fn_ws
thread(boost::fibers::detail::thread_barrier * b)64 void thread( boost::fibers::detail::thread_barrier * b) {
65     std::ostringstream buffer;
66     buffer << "thread started " << std::this_thread::get_id() << std::endl;
67     std::cout << buffer.str() << std::flush;
68     boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*<
69         Install the scheduling algorithm `boost::fibers::algo::shared_work` in order to
70         join the work sharing.
71     >*/
72     b->wait(); /*< sync with other threads: allow them to start processing >*/
73     lock_type lk( mtx_count);
74     cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
75         Suspend main fiber and resume worker fibers in the meanwhile.
76         Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
77         if all worker fibers are complete.
78     >*/
79     BOOST_ASSERT( 0 == fiber_count);
80 }
81 //]
82 
83 /*****************************************************************************
84 *   main()
85 *****************************************************************************/
main(int argc,char * argv[])86 int main( int argc, char *argv[]) {
87     std::cout << "main thread started " << std::this_thread::get_id() << std::endl;
88 //[main_ws
89     boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*<
90         Install the scheduling algorithm `boost::fibers::algo::shared_work` in the main thread
91         too, so each new fiber gets launched into the shared pool.
92     >*/
93 
94     for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*<
95         Launch a number of worker fibers; each worker fiber picks up a character
96         that is passed as parameter to fiber-function `whatevah`.
97         Each worker fiber gets detached.
98     >*/
99         boost::fibers::fiber([c](){ whatevah( c); }).detach();
100         ++fiber_count; /*< Increment fiber counter for each new fiber. >*/
101     }
102     boost::fibers::detail::thread_barrier b( 4);
103     std::thread threads[] = { /*<
104         Launch a couple of threads that join the work sharing.
105     >*/
106         std::thread( thread, & b),
107         std::thread( thread, & b),
108         std::thread( thread, & b)
109     };
110     b.wait(); /*< sync with other threads: allow them to start processing >*/
111     {
112         lock_type/*< `lock_type` is typedef'ed as __unique_lock__< [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] > >*/ lk( mtx_count);
113         cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
114             Suspend main fiber and resume worker fibers in the meanwhile.
115             Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
116             if all worker fibers are complete.
117         >*/
118     } /*<
119         Releasing lock of mtx_count is required before joining the threads, otherwise
120         the other threads would be blocked inside condition_variable::wait() and
121         would never return (deadlock).
122     >*/
123     BOOST_ASSERT( 0 == fiber_count);
124     for ( std::thread & t : threads) { /*< wait for threads to terminate >*/
125         t.join();
126     }
127 //]
128     std::cout << "done." << std::endl;
129     return EXIT_SUCCESS;
130 }
131