1 // Copyright (C) 2013,2014 Vicente J. Botet Escriba 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See accompanying 4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5 // 6 // 2013/11 Vicente J. Botet Escriba 7 // first implementation of a simple user scheduler. 8 // 2013/11 Vicente J. Botet Escriba 9 // rename loop_executor. 10 11 #ifndef BOOST_THREAD_EXECUTORS_LOOP_EXECUTOR_HPP 12 #define BOOST_THREAD_EXECUTORS_LOOP_EXECUTOR_HPP 13 14 #include <boost/thread/detail/config.hpp> 15 16 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE 17 18 #include <boost/thread/detail/delete.hpp> 19 #include <boost/thread/detail/move.hpp> 20 #include <boost/thread/concurrent_queues/sync_queue.hpp> 21 #include <boost/thread/executors/work.hpp> 22 #include <boost/assert.hpp> 23 24 #include <boost/config/abi_prefix.hpp> 25 26 namespace boost 27 { 28 namespace executors 29 { 30 31 class loop_executor 32 { 33 public: 34 /// type-erasure to store the works to do 35 typedef executors::work work; 36 private: 37 /// the thread safe work queue 38 concurrent::sync_queue<work > work_queue; 39 40 public: 41 /** 42 * Effects: try to execute one task. 43 * Returns: whether a task has been executed. 44 * Throws: whatever the current task constructor throws or the task() throws. 45 */ try_executing_one()46 bool try_executing_one() 47 { 48 return execute_one(/*wait:*/false); 49 } 50 51 private: 52 /** 53 * Effects: Execute one task. 54 * Remark: If wait is true, waits until a task is available or the executor 55 * is closed. If wait is false, returns false immediately if no 56 * task is available. 57 * Returns: whether a task has been executed (if wait is true, only returns false if closed). 58 * Throws: whatever the current task constructor throws or the task() throws. 59 */ execute_one(bool wait)60 bool execute_one(bool wait) 61 { 62 work task; 63 try 64 { 65 queue_op_status status = wait ? 66 work_queue.wait_pull(task) : 67 work_queue.try_pull(task); 68 if (status == queue_op_status::success) 69 { 70 task(); 71 return true; 72 } 73 BOOST_ASSERT(!wait || status == queue_op_status::closed); 74 return false; 75 } 76 catch (...) 77 { 78 std::terminate(); 79 //return false; 80 } 81 } 82 83 public: 84 /// loop_executor is not copyable. 85 BOOST_THREAD_NO_COPYABLE(loop_executor) 86 87 /** 88 * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. 89 * 90 * \b Throws: Whatever exception is thrown while initializing the needed resources. 91 */ loop_executor()92 loop_executor() 93 { 94 } 95 /** 96 * \b Effects: Destroys the thread pool. 97 * 98 * \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor. 99 */ ~loop_executor()100 ~loop_executor() 101 { 102 // signal to all the worker thread that there will be no more submissions. 103 close(); 104 } 105 106 /** 107 * The main loop of the worker thread 108 */ loop()109 void loop() 110 { 111 while (execute_one(/*wait:*/true)) 112 { 113 } 114 BOOST_ASSERT(closed()); 115 while (try_executing_one()) 116 { 117 } 118 } 119 120 /** 121 * \b Effects: close the \c loop_executor for submissions. 122 * The loop will work until there is no more closures to run. 123 */ close()124 void close() 125 { 126 work_queue.close(); 127 } 128 129 /** 130 * \b Returns: whether the pool is closed for submissions. 131 */ closed()132 bool closed() 133 { 134 return work_queue.closed(); 135 } 136 137 /** 138 * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. 139 * 140 * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. 141 * If invoked closure throws an exception the \c loop_executor will call \c std::terminate, as is the case with threads. 142 * 143 * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. 144 * 145 * \b Throws: \c sync_queue_is_closed if the thread pool is closed. 146 * Whatever exception that can be throw while storing the closure. 147 */ submit(BOOST_THREAD_RV_REF (work)closure)148 void submit(BOOST_THREAD_RV_REF(work) closure) { 149 work_queue.push(boost::move(closure)); 150 } 151 152 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) 153 template <typename Closure> submit(Closure & closure)154 void submit(Closure & closure) 155 { 156 submit(work(closure)); 157 } 158 #endif 159 submit(void (* closure)())160 void submit(void (*closure)()) 161 { 162 submit(work(closure)); 163 } 164 165 template <typename Closure> submit(BOOST_THREAD_FWD_REF (Closure)closure)166 void submit(BOOST_THREAD_FWD_REF(Closure) closure) 167 { 168 //work_queue.push(work(boost::forward<Closure>(closure))); 169 work w((boost::forward<Closure>(closure))); 170 submit(boost::move(w)); 171 } 172 173 /** 174 * \b Requires: This must be called from an scheduled task. 175 * 176 * \b Effects: reschedule functions until pred() 177 */ 178 template <typename Pred> reschedule_until(Pred const & pred)179 bool reschedule_until(Pred const& pred) 180 { 181 do { 182 if ( ! try_executing_one()) 183 { 184 return false; 185 } 186 } while (! pred()); 187 return true; 188 } 189 190 /** 191 * run queued closures 192 */ run_queued_closures()193 void run_queued_closures() 194 { 195 sync_queue<work>::underlying_queue_type q = work_queue.underlying_queue(); 196 while (! q.empty()) 197 { 198 work& task = q.front(); 199 task(); 200 q.pop_front(); 201 } 202 } 203 204 }; 205 } 206 using executors::loop_executor; 207 208 } 209 210 #include <boost/config/abi_suffix.hpp> 211 #endif 212 213 #endif 214