• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // detail/impl/strand_executor_service.hpp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #ifndef BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_HPP
12 #define BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_HPP
13 
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17 
18 #include <boost/asio/detail/call_stack.hpp>
19 #include <boost/asio/detail/fenced_block.hpp>
20 #include <boost/asio/detail/handler_invoke_helpers.hpp>
21 #include <boost/asio/detail/recycling_allocator.hpp>
22 #include <boost/asio/executor_work_guard.hpp>
23 #include <boost/asio/defer.hpp>
24 #include <boost/asio/dispatch.hpp>
25 #include <boost/asio/post.hpp>
26 
27 #include <boost/asio/detail/push_options.hpp>
28 
29 namespace boost {
30 namespace asio {
31 namespace detail {
32 
33 template <typename F, typename Allocator>
34 class strand_executor_service::allocator_binder
35 {
36 public:
37   typedef Allocator allocator_type;
38 
allocator_binder(BOOST_ASIO_MOVE_ARG (F)f,const Allocator & a)39   allocator_binder(BOOST_ASIO_MOVE_ARG(F) f, const Allocator& a)
40     : f_(BOOST_ASIO_MOVE_CAST(F)(f)),
41       allocator_(a)
42   {
43   }
44 
allocator_binder(const allocator_binder & other)45   allocator_binder(const allocator_binder& other)
46     : f_(other.f_),
47       allocator_(other.allocator_)
48   {
49   }
50 
51 #if defined(BOOST_ASIO_HAS_MOVE)
allocator_binder(allocator_binder && other)52   allocator_binder(allocator_binder&& other)
53     : f_(BOOST_ASIO_MOVE_CAST(F)(other.f_)),
54       allocator_(BOOST_ASIO_MOVE_CAST(allocator_type)(other.allocator_))
55   {
56   }
57 #endif // defined(BOOST_ASIO_HAS_MOVE)
58 
get_allocator() const59   allocator_type get_allocator() const BOOST_ASIO_NOEXCEPT
60   {
61     return allocator_;
62   }
63 
operator ()()64   void operator()()
65   {
66     f_();
67   }
68 
69 private:
70   F f_;
71   allocator_type allocator_;
72 };
73 
74 template <typename Executor>
75 class strand_executor_service::invoker<Executor,
76     typename enable_if<
77       execution::is_executor<Executor>::value
78     >::type>
79 {
80 public:
invoker(const implementation_type & impl,Executor & ex)81   invoker(const implementation_type& impl, Executor& ex)
82     : impl_(impl),
83       executor_(boost::asio::prefer(ex, execution::outstanding_work.tracked))
84   {
85   }
86 
invoker(const invoker & other)87   invoker(const invoker& other)
88     : impl_(other.impl_),
89       executor_(other.executor_)
90   {
91   }
92 
93 #if defined(BOOST_ASIO_HAS_MOVE)
invoker(invoker && other)94   invoker(invoker&& other)
95     : impl_(BOOST_ASIO_MOVE_CAST(implementation_type)(other.impl_)),
96       executor_(BOOST_ASIO_MOVE_CAST(executor_type)(other.executor_))
97   {
98   }
99 #endif // defined(BOOST_ASIO_HAS_MOVE)
100 
101   struct on_invoker_exit
102   {
103     invoker* this_;
104 
~on_invoker_exitboost::asio::detail::strand_executor_service::invoker::on_invoker_exit105     ~on_invoker_exit()
106     {
107       this_->impl_->mutex_->lock();
108       this_->impl_->ready_queue_.push(this_->impl_->waiting_queue_);
109       bool more_handlers = this_->impl_->locked_ =
110         !this_->impl_->ready_queue_.empty();
111       this_->impl_->mutex_->unlock();
112 
113       if (more_handlers)
114       {
115         recycling_allocator<void> allocator;
116         execution::execute(
117             boost::asio::prefer(
118               boost::asio::require(this_->executor_,
119                 execution::blocking.never),
120             execution::allocator(allocator)),
121             BOOST_ASIO_MOVE_CAST(invoker)(*this_));
122       }
123     }
124   };
125 
operator ()()126   void operator()()
127   {
128     // Indicate that this strand is executing on the current thread.
129     call_stack<strand_impl>::context ctx(impl_.get());
130 
131     // Ensure the next handler, if any, is scheduled on block exit.
132     on_invoker_exit on_exit = { this };
133     (void)on_exit;
134 
135     // Run all ready handlers. No lock is required since the ready queue is
136     // accessed only within the strand.
137     boost::system::error_code ec;
138     while (scheduler_operation* o = impl_->ready_queue_.front())
139     {
140       impl_->ready_queue_.pop();
141       o->complete(impl_.get(), ec, 0);
142     }
143   }
144 
145 private:
146   typedef typename decay<
147       typename prefer_result<
148         Executor,
149         execution::outstanding_work_t::tracked_t
150       >::type
151     >::type executor_type;
152 
153   implementation_type impl_;
154   executor_type executor_;
155 };
156 
157 #if !defined(BOOST_ASIO_NO_TS_EXECUTORS)
158 
159 template <typename Executor>
160 class strand_executor_service::invoker<Executor,
161     typename enable_if<
162       !execution::is_executor<Executor>::value
163     >::type>
164 {
165 public:
invoker(const implementation_type & impl,Executor & ex)166   invoker(const implementation_type& impl, Executor& ex)
167     : impl_(impl),
168       work_(ex)
169   {
170   }
171 
invoker(const invoker & other)172   invoker(const invoker& other)
173     : impl_(other.impl_),
174       work_(other.work_)
175   {
176   }
177 
178 #if defined(BOOST_ASIO_HAS_MOVE)
invoker(invoker && other)179   invoker(invoker&& other)
180     : impl_(BOOST_ASIO_MOVE_CAST(implementation_type)(other.impl_)),
181       work_(BOOST_ASIO_MOVE_CAST(executor_work_guard<Executor>)(other.work_))
182   {
183   }
184 #endif // defined(BOOST_ASIO_HAS_MOVE)
185 
186   struct on_invoker_exit
187   {
188     invoker* this_;
189 
~on_invoker_exitboost::asio::detail::strand_executor_service::invoker::on_invoker_exit190     ~on_invoker_exit()
191     {
192       this_->impl_->mutex_->lock();
193       this_->impl_->ready_queue_.push(this_->impl_->waiting_queue_);
194       bool more_handlers = this_->impl_->locked_ =
195         !this_->impl_->ready_queue_.empty();
196       this_->impl_->mutex_->unlock();
197 
198       if (more_handlers)
199       {
200         Executor ex(this_->work_.get_executor());
201         recycling_allocator<void> allocator;
202         ex.post(BOOST_ASIO_MOVE_CAST(invoker)(*this_), allocator);
203       }
204     }
205   };
206 
operator ()()207   void operator()()
208   {
209     // Indicate that this strand is executing on the current thread.
210     call_stack<strand_impl>::context ctx(impl_.get());
211 
212     // Ensure the next handler, if any, is scheduled on block exit.
213     on_invoker_exit on_exit = { this };
214     (void)on_exit;
215 
216     // Run all ready handlers. No lock is required since the ready queue is
217     // accessed only within the strand.
218     boost::system::error_code ec;
219     while (scheduler_operation* o = impl_->ready_queue_.front())
220     {
221       impl_->ready_queue_.pop();
222       o->complete(impl_.get(), ec, 0);
223     }
224   }
225 
226 private:
227   implementation_type impl_;
228   executor_work_guard<Executor> work_;
229 };
230 
231 #endif // !defined(BOOST_ASIO_NO_TS_EXECUTORS)
232 
233 template <typename Executor, typename Function>
execute(const implementation_type & impl,Executor & ex,BOOST_ASIO_MOVE_ARG (Function)function,typename enable_if<can_query<Executor,execution::allocator_t<void>>::value>::type *)234 inline void strand_executor_service::execute(const implementation_type& impl,
235     Executor& ex, BOOST_ASIO_MOVE_ARG(Function) function,
236     typename enable_if<
237       can_query<Executor, execution::allocator_t<void> >::value
238     >::type*)
239 {
240   return strand_executor_service::do_execute(impl, ex,
241       BOOST_ASIO_MOVE_CAST(Function)(function),
242       boost::asio::query(ex, execution::allocator));
243 }
244 
245 template <typename Executor, typename Function>
execute(const implementation_type & impl,Executor & ex,BOOST_ASIO_MOVE_ARG (Function)function,typename enable_if<!can_query<Executor,execution::allocator_t<void>>::value>::type *)246 inline void strand_executor_service::execute(const implementation_type& impl,
247     Executor& ex, BOOST_ASIO_MOVE_ARG(Function) function,
248     typename enable_if<
249       !can_query<Executor, execution::allocator_t<void> >::value
250     >::type*)
251 {
252   return strand_executor_service::do_execute(impl, ex,
253       BOOST_ASIO_MOVE_CAST(Function)(function),
254       std::allocator<void>());
255 }
256 
257 template <typename Executor, typename Function, typename Allocator>
do_execute(const implementation_type & impl,Executor & ex,BOOST_ASIO_MOVE_ARG (Function)function,const Allocator & a)258 void strand_executor_service::do_execute(const implementation_type& impl,
259     Executor& ex, BOOST_ASIO_MOVE_ARG(Function) function, const Allocator& a)
260 {
261   typedef typename decay<Function>::type function_type;
262 
263   // If the executor is not never-blocking, and we are already in the strand,
264   // then the function can run immediately.
265   if (boost::asio::query(ex, execution::blocking) != execution::blocking.never
266       && call_stack<strand_impl>::contains(impl.get()))
267   {
268     // Make a local, non-const copy of the function.
269     function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(function));
270 
271     fenced_block b(fenced_block::full);
272     boost_asio_handler_invoke_helpers::invoke(tmp, tmp);
273     return;
274   }
275 
276   // Allocate and construct an operation to wrap the function.
277   typedef executor_op<function_type, Allocator> op;
278   typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
279   p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(function), a);
280 
281   BOOST_ASIO_HANDLER_CREATION((impl->service_->context(), *p.p,
282         "strand_executor", impl.get(), 0, "execute"));
283 
284   // Add the function to the strand and schedule the strand if required.
285   bool first = enqueue(impl, p.p);
286   p.v = p.p = 0;
287   if (first)
288   {
289     execution::execute(ex, invoker<Executor>(impl, ex));
290   }
291 }
292 
293 template <typename Executor, typename Function, typename Allocator>
dispatch(const implementation_type & impl,Executor & ex,BOOST_ASIO_MOVE_ARG (Function)function,const Allocator & a)294 void strand_executor_service::dispatch(const implementation_type& impl,
295     Executor& ex, BOOST_ASIO_MOVE_ARG(Function) function, const Allocator& a)
296 {
297   typedef typename decay<Function>::type function_type;
298 
299   // If we are already in the strand then the function can run immediately.
300   if (call_stack<strand_impl>::contains(impl.get()))
301   {
302     // Make a local, non-const copy of the function.
303     function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(function));
304 
305     fenced_block b(fenced_block::full);
306     boost_asio_handler_invoke_helpers::invoke(tmp, tmp);
307     return;
308   }
309 
310   // Allocate and construct an operation to wrap the function.
311   typedef executor_op<function_type, Allocator> op;
312   typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
313   p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(function), a);
314 
315   BOOST_ASIO_HANDLER_CREATION((impl->service_->context(), *p.p,
316         "strand_executor", impl.get(), 0, "dispatch"));
317 
318   // Add the function to the strand and schedule the strand if required.
319   bool first = enqueue(impl, p.p);
320   p.v = p.p = 0;
321   if (first)
322   {
323     boost::asio::dispatch(ex,
324         allocator_binder<invoker<Executor>, Allocator>(
325           invoker<Executor>(impl, ex), a));
326   }
327 }
328 
329 // Request invocation of the given function and return immediately.
330 template <typename Executor, typename Function, typename Allocator>
post(const implementation_type & impl,Executor & ex,BOOST_ASIO_MOVE_ARG (Function)function,const Allocator & a)331 void strand_executor_service::post(const implementation_type& impl,
332     Executor& ex, BOOST_ASIO_MOVE_ARG(Function) function, const Allocator& a)
333 {
334   typedef typename decay<Function>::type function_type;
335 
336   // Allocate and construct an operation to wrap the function.
337   typedef executor_op<function_type, Allocator> op;
338   typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
339   p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(function), a);
340 
341   BOOST_ASIO_HANDLER_CREATION((impl->service_->context(), *p.p,
342         "strand_executor", impl.get(), 0, "post"));
343 
344   // Add the function to the strand and schedule the strand if required.
345   bool first = enqueue(impl, p.p);
346   p.v = p.p = 0;
347   if (first)
348   {
349     boost::asio::post(ex,
350         allocator_binder<invoker<Executor>, Allocator>(
351           invoker<Executor>(impl, ex), a));
352   }
353 }
354 
355 // Request invocation of the given function and return immediately.
356 template <typename Executor, typename Function, typename Allocator>
defer(const implementation_type & impl,Executor & ex,BOOST_ASIO_MOVE_ARG (Function)function,const Allocator & a)357 void strand_executor_service::defer(const implementation_type& impl,
358     Executor& ex, BOOST_ASIO_MOVE_ARG(Function) function, const Allocator& a)
359 {
360   typedef typename decay<Function>::type function_type;
361 
362   // Allocate and construct an operation to wrap the function.
363   typedef executor_op<function_type, Allocator> op;
364   typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
365   p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(function), a);
366 
367   BOOST_ASIO_HANDLER_CREATION((impl->service_->context(), *p.p,
368         "strand_executor", impl.get(), 0, "defer"));
369 
370   // Add the function to the strand and schedule the strand if required.
371   bool first = enqueue(impl, p.p);
372   p.v = p.p = 0;
373   if (first)
374   {
375     boost::asio::defer(ex,
376         allocator_binder<invoker<Executor>, Allocator>(
377           invoker<Executor>(impl, ex), a));
378   }
379 }
380 
381 } // namespace detail
382 } // namespace asio
383 } // namespace boost
384 
385 #include <boost/asio/detail/pop_options.hpp>
386 
387 #endif // BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_HPP
388