• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1//
2// detail/impl/select_reactor.ipp
3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4//
5// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#ifndef ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP
12#define ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP
13
14
15#include "asio/detail/config.hpp"
16
17#if defined(ASIO_HAS_IOCP)    || (!defined(ASIO_HAS_DEV_POLL)        && !defined(ASIO_HAS_EPOLL)        && !defined(ASIO_HAS_KQUEUE)        && !defined(ASIO_WINDOWS_RUNTIME))
18
19#include "asio/detail/bind_handler.hpp"
20#include "asio/detail/fd_set_adapter.hpp"
21#include "asio/detail/select_reactor.hpp"
22#include "asio/detail/signal_blocker.hpp"
23#include "asio/detail/socket_ops.hpp"
24
25#include "asio/detail/push_options.hpp"
26
27namespace asio {
28namespace detail {
29
30select_reactor::select_reactor(asio::io_service& io_service)
31  : asio::detail::service_base<select_reactor>(io_service),
32    io_service_(use_service<io_service_impl>(io_service)),
33    mutex_(),
34    interrupter_(),
35    shutdown_(false)
36{
37}
38
39select_reactor::~select_reactor()
40{
41  shutdown_service();
42}
43
44void select_reactor::shutdown_service()
45{
46  asio::detail::mutex::scoped_lock lock(mutex_);
47  shutdown_ = true;
48  lock.unlock();
49
50
51  op_queue<operation> ops;
52
53  for (int i = 0; i < max_ops; ++i)
54    op_queue_[i].get_all_operations(ops);
55
56  timer_queues_.get_all_timers(ops);
57
58  io_service_.abandon_operations(ops);
59}
60
61void select_reactor::fork_service(asio::io_service::fork_event fork_ev)
62{
63  if (fork_ev == asio::io_service::fork_child)
64    interrupter_.recreate();
65}
66
67void select_reactor::init_task()
68{
69  io_service_.init_task();
70}
71
72int select_reactor::register_descriptor(socket_type,
73    select_reactor::per_descriptor_data&)
74{
75  return 0;
76}
77
78int select_reactor::register_internal_descriptor(
79    int op_type, socket_type descriptor,
80    select_reactor::per_descriptor_data&, reactor_op* op)
81{
82  asio::detail::mutex::scoped_lock lock(mutex_);
83
84  op_queue_[op_type].enqueue_operation(descriptor, op);
85  interrupter_.interrupt();
86
87  return 0;
88}
89
90void select_reactor::move_descriptor(socket_type,
91    select_reactor::per_descriptor_data&,
92    select_reactor::per_descriptor_data&)
93{
94}
95
96void select_reactor::start_op(int op_type, socket_type descriptor,
97    select_reactor::per_descriptor_data&, reactor_op* op,
98    bool is_continuation, bool)
99{
100  asio::detail::mutex::scoped_lock lock(mutex_);
101
102  if (shutdown_)
103  {
104    post_immediate_completion(op, is_continuation);
105    return;
106  }
107
108  bool first = op_queue_[op_type].enqueue_operation(descriptor, op);
109  io_service_.work_started();
110  if (first)
111    interrupter_.interrupt();
112}
113
114void select_reactor::cancel_ops(socket_type descriptor,
115    select_reactor::per_descriptor_data&)
116{
117  asio::detail::mutex::scoped_lock lock(mutex_);
118  cancel_ops_unlocked(descriptor, asio::error::operation_aborted);
119}
120
121void select_reactor::deregister_descriptor(socket_type descriptor,
122    select_reactor::per_descriptor_data&, bool)
123{
124  asio::detail::mutex::scoped_lock lock(mutex_);
125  cancel_ops_unlocked(descriptor, asio::error::operation_aborted);
126}
127
128void select_reactor::deregister_internal_descriptor(
129    socket_type descriptor, select_reactor::per_descriptor_data&)
130{
131  asio::detail::mutex::scoped_lock lock(mutex_);
132  op_queue<operation> ops;
133  for (int i = 0; i < max_ops; ++i)
134    op_queue_[i].cancel_operations(descriptor, ops);
135}
136
137void select_reactor::run(bool block, op_queue<operation>& ops)
138{
139  asio::detail::mutex::scoped_lock lock(mutex_);
140
141
142  // Set up the descriptor sets.
143  for (int i = 0; i < max_select_ops; ++i)
144    fd_sets_[i].reset();
145  fd_sets_[read_op].set(interrupter_.read_descriptor());
146  socket_type max_fd = 0;
147  bool have_work_to_do = !timer_queues_.all_empty();
148  for (int i = 0; i < max_select_ops; ++i)
149  {
150    have_work_to_do = have_work_to_do || !op_queue_[i].empty();
151    fd_sets_[i].set(op_queue_[i], ops);
152    if (fd_sets_[i].max_descriptor() > max_fd)
153      max_fd = fd_sets_[i].max_descriptor();
154  }
155
156
157  // We can return immediately if there's no work to do and the reactor is
158  // not supposed to block.
159  if (!block && !have_work_to_do)
160    return;
161
162  // Determine how long to block while waiting for events.
163  timeval tv_buf = { 0, 0 };
164  timeval* tv = block ? get_timeout(tv_buf) : &tv_buf;
165
166  lock.unlock();
167
168  // Block on the select call until descriptors become ready.
169  asio::error_code ec;
170  int retval = socket_ops::select(static_cast<int>(max_fd + 1),
171      fd_sets_[read_op], fd_sets_[write_op], fd_sets_[except_op], tv, ec);
172
173  // Reset the interrupter.
174  if (retval > 0 && fd_sets_[read_op].is_set(interrupter_.read_descriptor()))
175  {
176    interrupter_.reset();
177    --retval;
178  }
179
180  lock.lock();
181
182  // Dispatch all ready operations.
183  if (retval > 0)
184  {
185
186    // Exception operations must be processed first to ensure that any
187    // out-of-band data is read before normal data.
188    for (int i = max_select_ops - 1; i >= 0; --i)
189      fd_sets_[i].perform(op_queue_[i], ops);
190  }
191  timer_queues_.get_ready_timers(ops);
192}
193
194void select_reactor::interrupt()
195{
196  interrupter_.interrupt();
197}
198
199
200void select_reactor::do_add_timer_queue(timer_queue_base& queue)
201{
202  mutex::scoped_lock lock(mutex_);
203  timer_queues_.insert(&queue);
204}
205
206void select_reactor::do_remove_timer_queue(timer_queue_base& queue)
207{
208  mutex::scoped_lock lock(mutex_);
209  timer_queues_.erase(&queue);
210}
211
212timeval* select_reactor::get_timeout(timeval& tv)
213{
214  // By default we will wait no longer than 5 minutes. This will ensure that
215  // any changes to the system clock are detected after no longer than this.
216  long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
217  tv.tv_sec = usec / 1000000;
218  tv.tv_usec = usec % 1000000;
219  return &tv;
220}
221
222void select_reactor::cancel_ops_unlocked(socket_type descriptor,
223    const asio::error_code& ec)
224{
225  bool need_interrupt = false;
226  op_queue<operation> ops;
227  for (int i = 0; i < max_ops; ++i)
228    need_interrupt = op_queue_[i].cancel_operations(
229        descriptor, ops, ec) || need_interrupt;
230  io_service_.post_deferred_completions(ops);
231  if (need_interrupt)
232    interrupter_.interrupt();
233}
234
235} // namespace detail
236} // namespace asio
237
238#include "asio/detail/pop_options.hpp"
239
240#endif // defined(ASIO_HAS_IOCP)
241       //   || (!defined(ASIO_HAS_DEV_POLL)
242       //       && !defined(ASIO_HAS_EPOLL)
243       //       && !defined(ASIO_HAS_KQUEUE))
244       //       && !defined(ASIO_WINDOWS_RUNTIME))
245
246#endif // ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP
247