1 // Copyright Nat Goodspeed + Oliver Kowalke 2015. 2 // Distributed under the Boost Software License, Version 1.0. 3 // (See accompanying file LICENSE_1_0.txt or copy at 4 // http://www.boost.org/LICENSE_1_0.txt) 5 6 #include <chrono> 7 #include <condition_variable> 8 #include <cstddef> 9 #include <deque> 10 #include <iomanip> 11 #include <iostream> 12 #include <mutex> 13 #include <sstream> 14 #include <string> 15 #include <thread> 16 17 #include <boost/assert.hpp> 18 19 #include <boost/fiber/all.hpp> 20 21 #include <boost/fiber/detail/thread_barrier.hpp> 22 23 static std::size_t fiber_count{ 0 }; 24 static std::mutex mtx_count{}; 25 static boost::fibers::condition_variable_any cnd_count{}; 26 typedef std::unique_lock< std::mutex > lock_type; 27 28 /***************************************************************************** 29 * example fiber function 30 *****************************************************************************/ 31 //[fiber_fn_ws whatevah(char me)32void whatevah( char me) { 33 try { 34 std::thread::id my_thread = std::this_thread::get_id(); /*< get ID of initial thread >*/ 35 { 36 std::ostringstream buffer; 37 buffer << "fiber " << me << " started on thread " << my_thread << '\n'; 38 std::cout << buffer.str() << std::flush; 39 } 40 for ( unsigned i = 0; i < 10; ++i) { /*< loop ten times >*/ 41 boost::this_fiber::yield(); /*< yield to other fibers >*/ 42 std::thread::id new_thread = std::this_thread::get_id(); /*< get ID of current thread >*/ 43 if ( new_thread != my_thread) { /*< test if fiber was migrated to another thread >*/ 44 my_thread = new_thread; 45 std::ostringstream buffer; 46 buffer << "fiber " << me << " switched to thread " << my_thread << '\n'; 47 std::cout << buffer.str() << std::flush; 48 } 49 } 50 } catch ( ... ) { 51 } 52 lock_type lk( mtx_count); 53 if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/ 54 lk.unlock(); 55 cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/ 56 } 57 } 58 //] 59 60 /***************************************************************************** 61 * example thread function 62 *****************************************************************************/ 63 //[thread_fn_ws thread()64void thread() { 65 std::ostringstream buffer; 66 buffer << "thread started " << std::this_thread::get_id() << std::endl; 67 std::cout << buffer.str() << std::flush; 68 boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( 4); /*< 69 Install the scheduling algorithm `boost::fibers::algo::work_stealing` in order to 70 join the work sharing. 71 >*/ 72 lock_type lk( mtx_count); 73 cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*< 74 Suspend main fiber and resume worker fibers in the meanwhile. 75 Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`) 76 if all worker fibers are complete. 77 >*/ 78 BOOST_ASSERT( 0 == fiber_count); 79 } 80 //] 81 82 /***************************************************************************** 83 * main() 84 *****************************************************************************/ main(int argc,char * argv[])85int main( int argc, char *argv[]) { 86 std::cout << "main thread started " << std::this_thread::get_id() << std::endl; 87 //[main_ws 88 for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*< 89 Launch a number of worker fibers; each worker fiber picks up a character 90 that is passed as parameter to fiber-function `whatevah`. 91 Each worker fiber gets detached. 92 >*/ 93 boost::fibers::fiber([c](){ whatevah( c); }).detach(); 94 ++fiber_count; /*< Increment fiber counter for each new fiber. >*/ 95 } 96 std::thread threads[] = { /*< 97 Launch a couple of threads that join the work sharing. 98 >*/ 99 std::thread( thread), 100 std::thread( thread), 101 std::thread( thread) 102 }; 103 boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( 4); /*< 104 Install the scheduling algorithm `boost::fibers::algo::work_stealing` in the main thread 105 too, so each new fiber gets launched into the shared pool. 106 >*/ 107 { 108 lock_type/*< `lock_type` is typedef'ed as __unique_lock__< [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] > >*/ lk( mtx_count); 109 cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*< 110 Suspend main fiber and resume worker fibers in the meanwhile. 111 Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`) 112 if all worker fibers are complete. 113 >*/ 114 } /*< 115 Releasing lock of mtx_count is required before joining the threads, otherwise 116 the other threads would be blocked inside condition_variable::wait() and 117 would never return (deadlock). 118 >*/ 119 BOOST_ASSERT( 0 == fiber_count); 120 for ( std::thread & t : threads) { /*< wait for threads to terminate >*/ 121 t.join(); 122 } 123 //] 124 std::cout << "done." << std::endl; 125 return EXIT_SUCCESS; 126 } 127