• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // prioritised_handlers.cpp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <boost/asio.hpp>
12 #include <iostream>
13 #include <memory>
14 #include <queue>
15 
16 using boost::asio::ip::tcp;
17 
18 class handler_priority_queue : public boost::asio::execution_context
19 {
20 public:
21   template <typename Function>
add(int priority,Function function)22   void add(int priority, Function function)
23   {
24     std::unique_ptr<queued_handler_base> handler(
25         new queued_handler<Function>(
26           priority, std::move(function)));
27 
28     handlers_.push(std::move(handler));
29   }
30 
execute_all()31   void execute_all()
32   {
33     while (!handlers_.empty())
34     {
35       handlers_.top()->execute();
36       handlers_.pop();
37     }
38   }
39 
40   class executor
41   {
42   public:
executor(handler_priority_queue & q,int p)43     executor(handler_priority_queue& q, int p)
44       : context_(q), priority_(p)
45     {
46     }
47 
context() const48     handler_priority_queue& context() const noexcept
49     {
50       return context_;
51     }
52 
53     template <typename Function, typename Allocator>
dispatch(Function f,const Allocator &) const54     void dispatch(Function f, const Allocator&) const
55     {
56       context_.add(priority_, std::move(f));
57     }
58 
59     template <typename Function, typename Allocator>
post(Function f,const Allocator &) const60     void post(Function f, const Allocator&) const
61     {
62       context_.add(priority_, std::move(f));
63     }
64 
65     template <typename Function, typename Allocator>
defer(Function f,const Allocator &) const66     void defer(Function f, const Allocator&) const
67     {
68       context_.add(priority_, std::move(f));
69     }
70 
on_work_started() const71     void on_work_started() const noexcept {}
on_work_finished() const72     void on_work_finished() const noexcept {}
73 
operator ==(const executor & other) const74     bool operator==(const executor& other) const noexcept
75     {
76       return &context_ == &other.context_ && priority_ == other.priority_;
77     }
78 
operator !=(const executor & other) const79     bool operator!=(const executor& other) const noexcept
80     {
81       return !operator==(other);
82     }
83 
84   private:
85     handler_priority_queue& context_;
86     int priority_;
87   };
88 
89   template <typename Handler>
90   boost::asio::executor_binder<Handler, executor>
wrap(int priority,Handler handler)91   wrap(int priority, Handler handler)
92   {
93     return boost::asio::bind_executor(
94         executor(*this, priority), std::move(handler));
95   }
96 
97 private:
98   class queued_handler_base
99   {
100   public:
queued_handler_base(int p)101     queued_handler_base(int p)
102       : priority_(p)
103     {
104     }
105 
~queued_handler_base()106     virtual ~queued_handler_base()
107     {
108     }
109 
110     virtual void execute() = 0;
111 
operator <(const std::unique_ptr<queued_handler_base> & a,const std::unique_ptr<queued_handler_base> & b)112     friend bool operator<(const std::unique_ptr<queued_handler_base>& a,
113         const std::unique_ptr<queued_handler_base>& b) noexcept
114     {
115       return a->priority_ < b->priority_;
116     }
117 
118   private:
119     int priority_;
120   };
121 
122   template <typename Function>
123   class queued_handler : public queued_handler_base
124   {
125   public:
queued_handler(int p,Function f)126     queued_handler(int p, Function f)
127       : queued_handler_base(p), function_(std::move(f))
128     {
129     }
130 
execute()131     void execute() override
132     {
133       function_();
134     }
135 
136   private:
137     Function function_;
138   };
139 
140   std::priority_queue<std::unique_ptr<queued_handler_base>> handlers_;
141 };
142 
143 //----------------------------------------------------------------------
144 
high_priority_handler(const boost::system::error_code &,tcp::socket)145 void high_priority_handler(const boost::system::error_code& /*ec*/,
146     tcp::socket /*socket*/)
147 {
148   std::cout << "High priority handler\n";
149 }
150 
middle_priority_handler(const boost::system::error_code &)151 void middle_priority_handler(const boost::system::error_code& /*ec*/)
152 {
153   std::cout << "Middle priority handler\n";
154 }
155 
156 struct low_priority_handler
157 {
158   // Make the handler a move-only type.
159   low_priority_handler() = default;
160   low_priority_handler(const low_priority_handler&) = delete;
161   low_priority_handler(low_priority_handler&&) = default;
162 
operator ()low_priority_handler163   void operator()()
164   {
165     std::cout << "Low priority handler\n";
166   }
167 };
168 
main()169 int main()
170 {
171   boost::asio::io_context io_context;
172 
173   handler_priority_queue pri_queue;
174 
175   // Post a completion handler to be run immediately.
176   boost::asio::post(io_context, pri_queue.wrap(0, low_priority_handler()));
177 
178   // Start an asynchronous accept that will complete immediately.
179   tcp::endpoint endpoint(boost::asio::ip::address_v4::loopback(), 0);
180   tcp::acceptor acceptor(io_context, endpoint);
181   tcp::socket server_socket(io_context);
182   acceptor.async_accept(pri_queue.wrap(100, high_priority_handler));
183   tcp::socket client_socket(io_context);
184   client_socket.connect(acceptor.local_endpoint());
185 
186   // Set a deadline timer to expire immediately.
187   boost::asio::steady_timer timer(io_context);
188   timer.expires_at(boost::asio::steady_timer::clock_type::time_point::min());
189   timer.async_wait(pri_queue.wrap(42, middle_priority_handler));
190 
191   while (io_context.run_one())
192   {
193     // The custom invocation hook adds the handlers to the priority queue
194     // rather than executing them from within the poll_one() call.
195     while (io_context.poll_one())
196       ;
197 
198     pri_queue.execute_all();
199   }
200 
201   return 0;
202 }
203