• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2013 Vicente J. Botet Escriba
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 //
6 // 2013/11 Vicente J. Botet Escriba
7 //    first implementation of a simple serial scheduler.
8 
9 #ifndef BOOST_THREAD_SERIAL_EXECUTOR_HPP
10 #define BOOST_THREAD_SERIAL_EXECUTOR_HPP
11 
12 #include <boost/thread/detail/config.hpp>
13 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE
14 
15 #include <exception>
16 #include <boost/thread/detail/delete.hpp>
17 #include <boost/thread/detail/move.hpp>
18 #include <boost/thread/concurrent_queues/sync_queue.hpp>
19 #include <boost/thread/executors/work.hpp>
20 #include <boost/thread/executors/generic_executor_ref.hpp>
21 #include <boost/thread/future.hpp>
22 #include <boost/thread/scoped_thread.hpp>
23 
24 #include <boost/config/abi_prefix.hpp>
25 
26 #if defined(BOOST_MSVC)
27 # pragma warning(push)
28 # pragma warning(disable: 4355) // 'this' : used in base member initializer list
29 #endif
30 
31 namespace boost
32 {
33 namespace executors
34 {
35   class serial_executor
36   {
37   public:
38     /// type-erasure to store the works to do
39     typedef  executors::work work;
40   private:
41     typedef  scoped_thread<> thread_t;
42 
43     /// the thread safe work queue
44     concurrent::sync_queue<work > work_queue;
45     generic_executor_ref ex;
46     thread_t thr;
47 
48     struct try_executing_one_task {
49       work& task;
50       boost::promise<void> &p;
try_executing_one_taskboost::executors::serial_executor::try_executing_one_task51       try_executing_one_task(work& task, boost::promise<void> &p)
52       : task(task), p(p) {}
operator ()boost::executors::serial_executor::try_executing_one_task53       void operator()() {
54         try {
55           task();
56           p.set_value();
57         } catch (...)
58         {
59           p.set_exception(current_exception());
60         }
61       }
62     };
63   public:
64     /**
65      * \par Returns
66      * The underlying executor wrapped on a generic executor reference.
67      */
underlying_executor()68     generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; }
69 
70     /**
71      * Effects: try to execute one task.
72      * Returns: whether a task has been executed.
73      * Throws: whatever the current task constructor throws or the task() throws.
74      */
try_executing_one()75     bool try_executing_one()
76     {
77       work task;
78       try
79       {
80         if (work_queue.try_pull(task) == queue_op_status::success)
81         {
82           boost::promise<void> p;
83           try_executing_one_task tmp(task,p);
84           ex.submit(tmp);
85           p.get_future().wait();
86           return true;
87         }
88         return false;
89       }
90       catch (...)
91       {
92         std::terminate();
93         //return false;
94       }
95     }
96   private:
97     /**
98      * Effects: schedule one task or yields
99      * Throws: whatever the current task constructor throws or the task() throws.
100      */
schedule_one_or_yield()101     void schedule_one_or_yield()
102     {
103         if ( ! try_executing_one())
104         {
105           this_thread::yield();
106         }
107     }
108 
109     /**
110      * The main loop of the worker thread
111      */
worker_thread()112     void worker_thread()
113     {
114       while (!closed())
115       {
116         schedule_one_or_yield();
117       }
118       while (try_executing_one())
119       {
120       }
121     }
122 
123   public:
124     /// serial_executor is not copyable.
125     BOOST_THREAD_NO_COPYABLE(serial_executor)
126 
127     /**
128      * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
129      *
130      * \b Throws: Whatever exception is thrown while initializing the needed resources.
131      */
132     template <class Executor>
serial_executor(Executor & ex)133     serial_executor(Executor& ex)
134     : ex(ex), thr(&serial_executor::worker_thread, this)
135     {
136     }
137     /**
138      * \b Effects: Destroys the thread pool.
139      *
140      * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor.
141      */
~serial_executor()142     ~serial_executor()
143     {
144       // signal to the worker thread that there will be no more submissions.
145       close();
146     }
147 
148     /**
149      * \b Effects: close the \c serial_executor for submissions.
150      * The loop will work until there is no more closures to run.
151      */
close()152     void close()
153     {
154       work_queue.close();
155     }
156 
157     /**
158      * \b Returns: whether the pool is closed for submissions.
159      */
closed()160     bool closed()
161     {
162       return work_queue.closed();
163     }
164 
165     /**
166      * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
167      *
168      * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
169      * If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads.
170      *
171      * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
172      *
173      * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
174      * Whatever exception that can be throw while storing the closure.
175      */
submit(BOOST_THREAD_RV_REF (work)closure)176     void submit(BOOST_THREAD_RV_REF(work) closure)
177     {
178       work_queue.push(boost::move(closure));
179     }
180 
181 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
182     template <typename Closure>
submit(Closure & closure)183     void submit(Closure & closure)
184     {
185       submit(work(closure));
186     }
187 #endif
submit(void (* closure)())188     void submit(void (*closure)())
189     {
190       submit(work(closure));
191     }
192 
193     template <typename Closure>
submit(BOOST_THREAD_FWD_REF (Closure)closure)194     void submit(BOOST_THREAD_FWD_REF(Closure) closure)
195     {
196       work w((boost::forward<Closure>(closure)));
197       submit(boost::move(w));
198     }
199 
200     /**
201      * \b Requires: This must be called from an scheduled task.
202      *
203      * \b Effects: reschedule functions until pred()
204      */
205     template <typename Pred>
reschedule_until(Pred const & pred)206     bool reschedule_until(Pred const& pred)
207     {
208       do {
209         if ( ! try_executing_one())
210         {
211           return false;
212         }
213       } while (! pred());
214       return true;
215     }
216 
217   };
218 }
219 using executors::serial_executor;
220 }
221 
222 #if defined(BOOST_MSVC)
223 # pragma warning(pop)
224 #endif
225 
226 #include <boost/config/abi_suffix.hpp>
227 
228 #endif
229 #endif
230