• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2013,2014 Vicente J. Botet Escriba
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 //
6 // 2013/11 Vicente J. Botet Escriba
7 //    first implementation of a simple user scheduler.
8 // 2013/11 Vicente J. Botet Escriba
9 //    rename loop_executor.
10 
11 #ifndef BOOST_THREAD_EXECUTORS_LOOP_EXECUTOR_HPP
12 #define BOOST_THREAD_EXECUTORS_LOOP_EXECUTOR_HPP
13 
14 #include <boost/thread/detail/config.hpp>
15 
16 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE
17 
18 #include <boost/thread/detail/delete.hpp>
19 #include <boost/thread/detail/move.hpp>
20 #include <boost/thread/concurrent_queues/sync_queue.hpp>
21 #include <boost/thread/executors/work.hpp>
22 #include <boost/assert.hpp>
23 
24 #include <boost/config/abi_prefix.hpp>
25 
26 namespace boost
27 {
28 namespace executors
29 {
30 
31   class loop_executor
32   {
33   public:
34     /// type-erasure to store the works to do
35     typedef  executors::work work;
36   private:
37     /// the thread safe work queue
38     concurrent::sync_queue<work > work_queue;
39 
40   public:
41     /**
42      * Effects: try to execute one task.
43      * Returns: whether a task has been executed.
44      * Throws: whatever the current task constructor throws or the task() throws.
45      */
try_executing_one()46     bool try_executing_one()
47     {
48       return execute_one(/*wait:*/false);
49     }
50 
51   private:
52     /**
53      * Effects: Execute one task.
54      * Remark: If wait is true, waits until a task is available or the executor
55      *         is closed. If wait is false, returns false immediately if no
56      *         task is available.
57      * Returns: whether a task has been executed (if wait is true, only returns false if closed).
58      * Throws: whatever the current task constructor throws or the task() throws.
59      */
execute_one(bool wait)60     bool execute_one(bool wait)
61     {
62       work task;
63       try
64       {
65         queue_op_status status = wait ?
66           work_queue.wait_pull(task) :
67           work_queue.try_pull(task);
68         if (status == queue_op_status::success)
69         {
70           task();
71           return true;
72         }
73         BOOST_ASSERT(!wait || status == queue_op_status::closed);
74         return false;
75       }
76       catch (...)
77       {
78         std::terminate();
79         //return false;
80       }
81     }
82 
83   public:
84     /// loop_executor is not copyable.
85     BOOST_THREAD_NO_COPYABLE(loop_executor)
86 
87     /**
88      * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
89      *
90      * \b Throws: Whatever exception is thrown while initializing the needed resources.
91      */
loop_executor()92     loop_executor()
93     {
94     }
95     /**
96      * \b Effects: Destroys the thread pool.
97      *
98      * \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor.
99      */
~loop_executor()100     ~loop_executor()
101     {
102       // signal to all the worker thread that there will be no more submissions.
103       close();
104     }
105 
106     /**
107      * The main loop of the worker thread
108      */
loop()109     void loop()
110     {
111       while (execute_one(/*wait:*/true))
112       {
113       }
114       BOOST_ASSERT(closed());
115       while (try_executing_one())
116       {
117       }
118     }
119 
120     /**
121      * \b Effects: close the \c loop_executor for submissions.
122      * The loop will work until there is no more closures to run.
123      */
close()124     void close()
125     {
126       work_queue.close();
127     }
128 
129     /**
130      * \b Returns: whether the pool is closed for submissions.
131      */
closed()132     bool closed()
133     {
134       return work_queue.closed();
135     }
136 
137     /**
138      * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
139      *
140      * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
141      * If invoked closure throws an exception the \c loop_executor will call \c std::terminate, as is the case with threads.
142      *
143      * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
144      *
145      * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
146      * Whatever exception that can be throw while storing the closure.
147      */
submit(BOOST_THREAD_RV_REF (work)closure)148     void submit(BOOST_THREAD_RV_REF(work) closure)  {
149       work_queue.push(boost::move(closure));
150     }
151 
152 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
153     template <typename Closure>
submit(Closure & closure)154     void submit(Closure & closure)
155     {
156       submit(work(closure));
157    }
158 #endif
159 
submit(void (* closure)())160     void submit(void (*closure)())
161     {
162       submit(work(closure));
163     }
164 
165     template <typename Closure>
submit(BOOST_THREAD_FWD_REF (Closure)closure)166     void submit(BOOST_THREAD_FWD_REF(Closure) closure)
167     {
168       //work_queue.push(work(boost::forward<Closure>(closure)));
169       work w((boost::forward<Closure>(closure)));
170       submit(boost::move(w));
171     }
172 
173     /**
174      * \b Requires: This must be called from an scheduled task.
175      *
176      * \b Effects: reschedule functions until pred()
177      */
178     template <typename Pred>
reschedule_until(Pred const & pred)179     bool reschedule_until(Pred const& pred)
180     {
181       do {
182         if ( ! try_executing_one())
183         {
184           return false;
185         }
186       } while (! pred());
187       return true;
188     }
189 
190     /**
191      * run queued closures
192      */
run_queued_closures()193     void run_queued_closures()
194     {
195       sync_queue<work>::underlying_queue_type q = work_queue.underlying_queue();
196       while (! q.empty())
197       {
198         work& task = q.front();
199         task();
200         q.pop_front();
201       }
202     }
203 
204   };
205 }
206 using executors::loop_executor;
207 
208 }
209 
210 #include <boost/config/abi_suffix.hpp>
211 #endif
212 
213 #endif
214