1 // Copyright (C) 2013-2014 Vicente J. Botet Escriba 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 // 6 // 2013/09 Vicente J. Botet Escriba 7 // Adapt to boost from CCIA C++11 implementation 8 // first implementation of a simple pool thread using a vector of threads and a sync_queue. 9 10 #ifndef BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP 11 #define BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP 12 13 #include <boost/thread/detail/config.hpp> 14 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE 15 16 #include <boost/thread/detail/delete.hpp> 17 #include <boost/thread/detail/move.hpp> 18 #include <boost/thread/thread.hpp> 19 #include <boost/thread/concurrent_queues/sync_queue.hpp> 20 #include <boost/thread/executors/work.hpp> 21 #include <boost/thread/csbl/vector.hpp> 22 23 #include <boost/config/abi_prefix.hpp> 24 25 namespace boost 26 { 27 namespace executors 28 { 29 class basic_thread_pool 30 { 31 public: 32 /// type-erasure to store the works to do 33 typedef executors::work work; 34 private: 35 typedef thread thread_t; 36 /// A move aware vector type 37 typedef csbl::vector<thread_t> thread_vector; 38 39 /// A move aware vector 40 thread_vector threads; 41 /// the thread safe work queue 42 concurrent::sync_queue<work > work_queue; 43 44 public: 45 /** 46 * Effects: try to execute one task. 47 * Returns: whether a task has been executed. 48 * Throws: whatever the current task constructor throws or the task() throws. 49 */ try_executing_one()50 bool try_executing_one() 51 { 52 try 53 { 54 work task; 55 if (work_queue.try_pull(task) == queue_op_status::success) 56 { 57 task(); 58 return true; 59 } 60 return false; 61 } 62 catch (...) 63 { 64 std::terminate(); 65 //return false; 66 } 67 } 68 /** 69 * Effects: schedule one task or yields 70 * Throws: whatever the current task constructor throws or the task() throws. 71 */ schedule_one_or_yield()72 void schedule_one_or_yield() 73 { 74 if ( ! try_executing_one()) 75 { 76 this_thread::yield(); 77 } 78 } 79 private: 80 81 /** 82 * The main loop of the worker threads 83 */ worker_thread()84 void worker_thread() 85 { 86 try 87 { 88 for(;;) 89 { 90 work task; 91 try 92 { 93 queue_op_status st = work_queue.wait_pull(task); 94 if (st == queue_op_status::closed) { 95 return; 96 } 97 task(); 98 } 99 catch (boost::thread_interrupted&) 100 { 101 return; 102 } 103 } 104 } 105 catch (...) 106 { 107 std::terminate(); 108 return; 109 } 110 } 111 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) 112 template <class AtThreadEntry> worker_thread1(AtThreadEntry & at_thread_entry)113 void worker_thread1(AtThreadEntry& at_thread_entry) 114 { 115 at_thread_entry(*this); 116 worker_thread(); 117 } 118 #endif worker_thread2(void (* at_thread_entry)(basic_thread_pool &))119 void worker_thread2(void(*at_thread_entry)(basic_thread_pool&)) 120 { 121 at_thread_entry(*this); 122 worker_thread(); 123 } 124 template <class AtThreadEntry> worker_thread3(BOOST_THREAD_FWD_REF (AtThreadEntry)at_thread_entry)125 void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry) 126 { 127 at_thread_entry(*this); 128 worker_thread(); 129 } do_nothing_at_thread_entry(basic_thread_pool &)130 static void do_nothing_at_thread_entry(basic_thread_pool&) {} 131 132 public: 133 /// basic_thread_pool is not copyable. 134 BOOST_THREAD_NO_COPYABLE(basic_thread_pool) 135 136 /** 137 * \b Effects: creates a thread pool that runs closures on \c thread_count threads. 138 * 139 * \b Throws: Whatever exception is thrown while initializing the needed resources. 140 */ basic_thread_pool(unsigned const thread_count=thread::hardware_concurrency ()+1)141 basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1) 142 { 143 try 144 { 145 threads.reserve(thread_count); 146 for (unsigned i = 0; i < thread_count; ++i) 147 { 148 #if 1 149 thread th (&basic_thread_pool::worker_thread, this); 150 threads.push_back(thread_t(boost::move(th))); 151 #else 152 threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile 153 #endif 154 } 155 } 156 catch (...) 157 { 158 close(); 159 throw; 160 } 161 } 162 /** 163 * \b Effects: creates a thread pool that runs closures on \c thread_count threads 164 * and executes the at_thread_entry function at the entry of each created thread. . 165 * 166 * \b Throws: Whatever exception is thrown while initializing the needed resources. 167 */ 168 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) 169 template <class AtThreadEntry> basic_thread_pool(unsigned const thread_count,AtThreadEntry & at_thread_entry)170 basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry) 171 { 172 try 173 { 174 threads.reserve(thread_count); 175 for (unsigned i = 0; i < thread_count; ++i) 176 { 177 thread th (&basic_thread_pool::worker_thread1<AtThreadEntry>, this, at_thread_entry); 178 threads.push_back(thread_t(boost::move(th))); 179 //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile 180 } 181 } 182 catch (...) 183 { 184 close(); 185 throw; 186 } 187 } 188 #endif basic_thread_pool(unsigned const thread_count,void (* at_thread_entry)(basic_thread_pool &))189 basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&)) 190 { 191 try 192 { 193 threads.reserve(thread_count); 194 for (unsigned i = 0; i < thread_count; ++i) 195 { 196 thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry); 197 threads.push_back(thread_t(boost::move(th))); 198 //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile 199 } 200 } 201 catch (...) 202 { 203 close(); 204 throw; 205 } 206 } 207 template <class AtThreadEntry> basic_thread_pool(unsigned const thread_count,BOOST_THREAD_FWD_REF (AtThreadEntry)at_thread_entry)208 basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry) 209 { 210 try 211 { 212 threads.reserve(thread_count); 213 for (unsigned i = 0; i < thread_count; ++i) 214 { 215 thread th (&basic_thread_pool::worker_thread3<AtThreadEntry>, this, boost::forward<AtThreadEntry>(at_thread_entry)); 216 threads.push_back(thread_t(boost::move(th))); 217 //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile 218 } 219 } 220 catch (...) 221 { 222 close(); 223 throw; 224 } 225 } 226 /** 227 * \b Effects: Destroys the thread pool. 228 * 229 * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor. 230 */ ~basic_thread_pool()231 ~basic_thread_pool() 232 { 233 // signal to all the worker threads that there will be no more submissions. 234 close(); 235 // joins all the threads before destroying the thread pool resources (e.g. the queue). 236 interrupt_and_join(); 237 } 238 239 /** 240 * \b Effects: join all the threads. 241 */ join()242 void join() 243 { 244 for (unsigned i = 0; i < threads.size(); ++i) 245 { 246 //threads[i].interrupt(); 247 threads[i].join(); 248 } 249 } 250 251 /** 252 * \b Effects: interrupt all the threads. 253 */ interrupt()254 void interrupt() 255 { 256 for (unsigned i = 0; i < threads.size(); ++i) 257 { 258 threads[i].interrupt(); 259 } 260 } 261 262 /** 263 * \b Effects: interrupt and join all the threads. 264 */ interrupt_and_join()265 void interrupt_and_join() 266 { 267 for (unsigned i = 0; i < threads.size(); ++i) 268 { 269 threads[i].interrupt(); 270 threads[i].join(); 271 } 272 } 273 274 /** 275 * \b Effects: close the \c basic_thread_pool for submissions. 276 * The worker threads will work until there is no more closures to run. 277 */ close()278 void close() 279 { 280 work_queue.close(); 281 } 282 283 /** 284 * \b Returns: whether the pool is closed for submissions. 285 */ closed()286 bool closed() 287 { 288 return work_queue.closed(); 289 } 290 291 /** 292 * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. 293 * 294 * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. 295 * If invoked closure throws an exception the \c basic_thread_pool will call \c std::terminate, as is the case with threads. 296 * 297 * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. 298 * 299 * \b Throws: \c sync_queue_is_closed if the thread pool is closed. 300 * Whatever exception that can be throw while storing the closure. 301 */ submit(BOOST_THREAD_RV_REF (work)closure)302 void submit(BOOST_THREAD_RV_REF(work) closure) { 303 work_queue.push(boost::move(closure)); 304 } 305 306 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) 307 template <typename Closure> submit(Closure & closure)308 void submit(Closure & closure) 309 { 310 submit(work(closure)); 311 } 312 #endif submit(void (* closure)())313 void submit(void (*closure)()) 314 { 315 submit(work(closure)); 316 } 317 318 template <typename Closure> submit(BOOST_THREAD_FWD_REF (Closure)closure)319 void submit(BOOST_THREAD_FWD_REF(Closure) closure) 320 { 321 //submit(work(boost::forward<Closure>(closure))); 322 work w((boost::forward<Closure>(closure))); 323 submit(boost::move(w)); 324 } 325 326 /** 327 * \b Requires: This must be called from an scheduled task. 328 * 329 * \b Effects: reschedule functions until pred() 330 */ 331 template <typename Pred> reschedule_until(Pred const & pred)332 bool reschedule_until(Pred const& pred) 333 { 334 do { 335 if ( ! try_executing_one()) 336 { 337 return false; 338 } 339 } while (! pred()); 340 return true; 341 } 342 343 }; 344 } 345 using executors::basic_thread_pool; 346 347 } 348 349 #include <boost/config/abi_suffix.hpp> 350 351 #endif 352 #endif 353