1 //
2 // prioritised_handlers.cpp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <boost/asio.hpp>
12 #include <iostream>
13 #include <memory>
14 #include <queue>
15
16 using boost::asio::ip::tcp;
17
18 class handler_priority_queue : public boost::asio::execution_context
19 {
20 public:
21 template <typename Function>
add(int priority,Function function)22 void add(int priority, Function function)
23 {
24 std::unique_ptr<queued_handler_base> handler(
25 new queued_handler<Function>(
26 priority, std::move(function)));
27
28 handlers_.push(std::move(handler));
29 }
30
execute_all()31 void execute_all()
32 {
33 while (!handlers_.empty())
34 {
35 handlers_.top()->execute();
36 handlers_.pop();
37 }
38 }
39
40 class executor
41 {
42 public:
executor(handler_priority_queue & q,int p)43 executor(handler_priority_queue& q, int p)
44 : context_(q), priority_(p)
45 {
46 }
47
context() const48 handler_priority_queue& context() const noexcept
49 {
50 return context_;
51 }
52
53 template <typename Function, typename Allocator>
dispatch(Function f,const Allocator &) const54 void dispatch(Function f, const Allocator&) const
55 {
56 context_.add(priority_, std::move(f));
57 }
58
59 template <typename Function, typename Allocator>
post(Function f,const Allocator &) const60 void post(Function f, const Allocator&) const
61 {
62 context_.add(priority_, std::move(f));
63 }
64
65 template <typename Function, typename Allocator>
defer(Function f,const Allocator &) const66 void defer(Function f, const Allocator&) const
67 {
68 context_.add(priority_, std::move(f));
69 }
70
on_work_started() const71 void on_work_started() const noexcept {}
on_work_finished() const72 void on_work_finished() const noexcept {}
73
operator ==(const executor & other) const74 bool operator==(const executor& other) const noexcept
75 {
76 return &context_ == &other.context_ && priority_ == other.priority_;
77 }
78
operator !=(const executor & other) const79 bool operator!=(const executor& other) const noexcept
80 {
81 return !operator==(other);
82 }
83
84 private:
85 handler_priority_queue& context_;
86 int priority_;
87 };
88
89 template <typename Handler>
90 boost::asio::executor_binder<Handler, executor>
wrap(int priority,Handler handler)91 wrap(int priority, Handler handler)
92 {
93 return boost::asio::bind_executor(
94 executor(*this, priority), std::move(handler));
95 }
96
97 private:
98 class queued_handler_base
99 {
100 public:
queued_handler_base(int p)101 queued_handler_base(int p)
102 : priority_(p)
103 {
104 }
105
~queued_handler_base()106 virtual ~queued_handler_base()
107 {
108 }
109
110 virtual void execute() = 0;
111
operator <(const std::unique_ptr<queued_handler_base> & a,const std::unique_ptr<queued_handler_base> & b)112 friend bool operator<(const std::unique_ptr<queued_handler_base>& a,
113 const std::unique_ptr<queued_handler_base>& b) noexcept
114 {
115 return a->priority_ < b->priority_;
116 }
117
118 private:
119 int priority_;
120 };
121
122 template <typename Function>
123 class queued_handler : public queued_handler_base
124 {
125 public:
queued_handler(int p,Function f)126 queued_handler(int p, Function f)
127 : queued_handler_base(p), function_(std::move(f))
128 {
129 }
130
execute()131 void execute() override
132 {
133 function_();
134 }
135
136 private:
137 Function function_;
138 };
139
140 std::priority_queue<std::unique_ptr<queued_handler_base>> handlers_;
141 };
142
143 //----------------------------------------------------------------------
144
high_priority_handler(const boost::system::error_code &,tcp::socket)145 void high_priority_handler(const boost::system::error_code& /*ec*/,
146 tcp::socket /*socket*/)
147 {
148 std::cout << "High priority handler\n";
149 }
150
middle_priority_handler(const boost::system::error_code &)151 void middle_priority_handler(const boost::system::error_code& /*ec*/)
152 {
153 std::cout << "Middle priority handler\n";
154 }
155
156 struct low_priority_handler
157 {
158 // Make the handler a move-only type.
159 low_priority_handler() = default;
160 low_priority_handler(const low_priority_handler&) = delete;
161 low_priority_handler(low_priority_handler&&) = default;
162
operator ()low_priority_handler163 void operator()()
164 {
165 std::cout << "Low priority handler\n";
166 }
167 };
168
main()169 int main()
170 {
171 boost::asio::io_context io_context;
172
173 handler_priority_queue pri_queue;
174
175 // Post a completion handler to be run immediately.
176 boost::asio::post(io_context, pri_queue.wrap(0, low_priority_handler()));
177
178 // Start an asynchronous accept that will complete immediately.
179 tcp::endpoint endpoint(boost::asio::ip::address_v4::loopback(), 0);
180 tcp::acceptor acceptor(io_context, endpoint);
181 tcp::socket server_socket(io_context);
182 acceptor.async_accept(pri_queue.wrap(100, high_priority_handler));
183 tcp::socket client_socket(io_context);
184 client_socket.connect(acceptor.local_endpoint());
185
186 // Set a deadline timer to expire immediately.
187 boost::asio::steady_timer timer(io_context);
188 timer.expires_at(boost::asio::steady_timer::clock_type::time_point::min());
189 timer.async_wait(pri_queue.wrap(42, middle_priority_handler));
190
191 while (io_context.run_one())
192 {
193 // The custom invocation hook adds the handlers to the priority queue
194 // rather than executing them from within the poll_one() call.
195 while (io_context.poll_one())
196 ;
197
198 pri_queue.execute_all();
199 }
200
201 return 0;
202 }
203