1 // Copyright (C) 2013 Vicente J. Botet Escriba 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 // 6 // 2013/11 Vicente J. Botet Escriba 7 // first implementation of a simple serial scheduler. 8 9 #ifndef BOOST_THREAD_SERIAL_EXECUTOR_HPP 10 #define BOOST_THREAD_SERIAL_EXECUTOR_HPP 11 12 #include <boost/thread/detail/config.hpp> 13 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE 14 15 #include <exception> 16 #include <boost/thread/detail/delete.hpp> 17 #include <boost/thread/detail/move.hpp> 18 #include <boost/thread/concurrent_queues/sync_queue.hpp> 19 #include <boost/thread/executors/work.hpp> 20 #include <boost/thread/executors/generic_executor_ref.hpp> 21 #include <boost/thread/future.hpp> 22 #include <boost/thread/scoped_thread.hpp> 23 24 #include <boost/config/abi_prefix.hpp> 25 26 #if defined(BOOST_MSVC) 27 # pragma warning(push) 28 # pragma warning(disable: 4355) // 'this' : used in base member initializer list 29 #endif 30 31 namespace boost 32 { 33 namespace executors 34 { 35 class serial_executor 36 { 37 public: 38 /// type-erasure to store the works to do 39 typedef executors::work work; 40 private: 41 typedef scoped_thread<> thread_t; 42 43 /// the thread safe work queue 44 concurrent::sync_queue<work > work_queue; 45 generic_executor_ref ex; 46 thread_t thr; 47 48 struct try_executing_one_task { 49 work& task; 50 boost::promise<void> &p; try_executing_one_taskboost::executors::serial_executor::try_executing_one_task51 try_executing_one_task(work& task, boost::promise<void> &p) 52 : task(task), p(p) {} operator ()boost::executors::serial_executor::try_executing_one_task53 void operator()() { 54 try { 55 task(); 56 p.set_value(); 57 } catch (...) 58 { 59 p.set_exception(current_exception()); 60 } 61 } 62 }; 63 public: 64 /** 65 * \par Returns 66 * The underlying executor wrapped on a generic executor reference. 67 */ underlying_executor()68 generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; } 69 70 /** 71 * Effects: try to execute one task. 72 * Returns: whether a task has been executed. 73 * Throws: whatever the current task constructor throws or the task() throws. 74 */ try_executing_one()75 bool try_executing_one() 76 { 77 work task; 78 try 79 { 80 if (work_queue.try_pull(task) == queue_op_status::success) 81 { 82 boost::promise<void> p; 83 try_executing_one_task tmp(task,p); 84 ex.submit(tmp); 85 p.get_future().wait(); 86 return true; 87 } 88 return false; 89 } 90 catch (...) 91 { 92 std::terminate(); 93 //return false; 94 } 95 } 96 private: 97 /** 98 * Effects: schedule one task or yields 99 * Throws: whatever the current task constructor throws or the task() throws. 100 */ schedule_one_or_yield()101 void schedule_one_or_yield() 102 { 103 if ( ! try_executing_one()) 104 { 105 this_thread::yield(); 106 } 107 } 108 109 /** 110 * The main loop of the worker thread 111 */ worker_thread()112 void worker_thread() 113 { 114 while (!closed()) 115 { 116 schedule_one_or_yield(); 117 } 118 while (try_executing_one()) 119 { 120 } 121 } 122 123 public: 124 /// serial_executor is not copyable. 125 BOOST_THREAD_NO_COPYABLE(serial_executor) 126 127 /** 128 * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. 129 * 130 * \b Throws: Whatever exception is thrown while initializing the needed resources. 131 */ 132 template <class Executor> serial_executor(Executor & ex)133 serial_executor(Executor& ex) 134 : ex(ex), thr(&serial_executor::worker_thread, this) 135 { 136 } 137 /** 138 * \b Effects: Destroys the thread pool. 139 * 140 * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor. 141 */ ~serial_executor()142 ~serial_executor() 143 { 144 // signal to the worker thread that there will be no more submissions. 145 close(); 146 } 147 148 /** 149 * \b Effects: close the \c serial_executor for submissions. 150 * The loop will work until there is no more closures to run. 151 */ close()152 void close() 153 { 154 work_queue.close(); 155 } 156 157 /** 158 * \b Returns: whether the pool is closed for submissions. 159 */ closed()160 bool closed() 161 { 162 return work_queue.closed(); 163 } 164 165 /** 166 * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. 167 * 168 * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. 169 * If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads. 170 * 171 * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. 172 * 173 * \b Throws: \c sync_queue_is_closed if the thread pool is closed. 174 * Whatever exception that can be throw while storing the closure. 175 */ submit(BOOST_THREAD_RV_REF (work)closure)176 void submit(BOOST_THREAD_RV_REF(work) closure) 177 { 178 work_queue.push(boost::move(closure)); 179 } 180 181 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) 182 template <typename Closure> submit(Closure & closure)183 void submit(Closure & closure) 184 { 185 submit(work(closure)); 186 } 187 #endif submit(void (* closure)())188 void submit(void (*closure)()) 189 { 190 submit(work(closure)); 191 } 192 193 template <typename Closure> submit(BOOST_THREAD_FWD_REF (Closure)closure)194 void submit(BOOST_THREAD_FWD_REF(Closure) closure) 195 { 196 work w((boost::forward<Closure>(closure))); 197 submit(boost::move(w)); 198 } 199 200 /** 201 * \b Requires: This must be called from an scheduled task. 202 * 203 * \b Effects: reschedule functions until pred() 204 */ 205 template <typename Pred> reschedule_until(Pred const & pred)206 bool reschedule_until(Pred const& pred) 207 { 208 do { 209 if ( ! try_executing_one()) 210 { 211 return false; 212 } 213 } while (! pred()); 214 return true; 215 } 216 217 }; 218 } 219 using executors::serial_executor; 220 } 221 222 #if defined(BOOST_MSVC) 223 # pragma warning(pop) 224 #endif 225 226 #include <boost/config/abi_suffix.hpp> 227 228 #endif 229 #endif 230