1 #include <boost/asio/execution.hpp>
2 #include <condition_variable>
3 #include <iostream>
4 #include <memory>
5 #include <mutex>
6 #include <queue>
7
8 namespace execution = boost::asio::execution;
9
10 namespace custom_props {
11
12 struct priority
13 {
14 template <typename T>
15 static constexpr bool is_applicable_property_v =
16 execution::is_executor<T>::value;
17
18 static constexpr bool is_requirable = true;
19 static constexpr bool is_preferable = true;
20
21 using polymorphic_query_result_type = int;
22
valuecustom_props::priority23 int value() const { return value_; }
24
25 int value_ = 1;
26 };
27
28 constexpr priority low_priority{0};
29 constexpr priority normal_priority{1};
30 constexpr priority high_priority{2};
31
32 } // namespace custom_props
33
34 class priority_scheduler
35 {
36 public:
37 // A class that satisfies the Executor requirements.
38 class executor_type
39 {
40 public:
executor_type(priority_scheduler & ctx)41 executor_type(priority_scheduler& ctx) noexcept
42 : context_(ctx), priority_(custom_props::normal_priority.value())
43 {
44 }
45
query(execution::context_t) const46 priority_scheduler& query(execution::context_t) const noexcept
47 {
48 return context_;
49 }
50
query(custom_props::priority) const51 int query(custom_props::priority) const noexcept
52 {
53 return priority_;
54 }
55
require(custom_props::priority pri) const56 executor_type require(custom_props::priority pri) const
57 {
58 executor_type new_ex(*this);
59 new_ex.priority_ = pri.value();
60 return new_ex;
61 }
62
63 template <class Func>
execute(Func f) const64 void execute(Func f) const
65 {
66 auto p(std::make_shared<item<Func>>(priority_, std::move(f)));
67 std::lock_guard<std::mutex> lock(context_.mutex_);
68 context_.queue_.push(p);
69 context_.condition_.notify_one();
70 }
71
operator ==(const executor_type & a,const executor_type & b)72 friend bool operator==(const executor_type& a,
73 const executor_type& b) noexcept
74 {
75 return &a.context_ == &b.context_;
76 }
77
operator !=(const executor_type & a,const executor_type & b)78 friend bool operator!=(const executor_type& a,
79 const executor_type& b) noexcept
80 {
81 return &a.context_ != &b.context_;
82 }
83
84 private:
85 priority_scheduler& context_;
86 int priority_;
87 };
88
executor()89 executor_type executor() noexcept
90 {
91 return executor_type(*const_cast<priority_scheduler*>(this));
92 }
93
run()94 void run()
95 {
96 std::unique_lock<std::mutex> lock(mutex_);
97 for (;;)
98 {
99 condition_.wait(lock, [&]{ return stopped_ || !queue_.empty(); });
100 if (stopped_)
101 return;
102 auto p(queue_.top());
103 queue_.pop();
104 lock.unlock();
105 p->execute_(p);
106 lock.lock();
107 }
108 }
109
stop()110 void stop()
111 {
112 std::lock_guard<std::mutex> lock(mutex_);
113 stopped_ = true;
114 condition_.notify_all();
115 }
116
117 private:
118 struct item_base
119 {
120 int priority_;
121 void (*execute_)(std::shared_ptr<item_base>&);
122 };
123
124 template <class Func>
125 struct item : item_base
126 {
itempriority_scheduler::item127 item(int pri, Func f) : function_(std::move(f))
128 {
129 priority_ = pri;
130 execute_ = [](std::shared_ptr<item_base>& p)
131 {
132 Func tmp(std::move(static_cast<item*>(p.get())->function_));
133 p.reset();
134 tmp();
135 };
136 }
137
138 Func function_;
139 };
140
141 struct item_comp
142 {
operator ()priority_scheduler::item_comp143 bool operator()(
144 const std::shared_ptr<item_base>& a,
145 const std::shared_ptr<item_base>& b)
146 {
147 return a->priority_ < b->priority_;
148 }
149 };
150
151 std::mutex mutex_;
152 std::condition_variable condition_;
153 std::priority_queue<
154 std::shared_ptr<item_base>,
155 std::vector<std::shared_ptr<item_base>>,
156 item_comp> queue_;
157 bool stopped_ = false;
158 };
159
main()160 int main()
161 {
162 priority_scheduler sched;
163 auto ex = sched.executor();
164 auto prefer_low = boost::asio::prefer(ex, custom_props::low_priority);
165 auto low = boost::asio::require(ex, custom_props::low_priority);
166 auto med = boost::asio::require(ex, custom_props::normal_priority);
167 auto high = boost::asio::require(ex, custom_props::high_priority);
168 execution::any_executor<custom_props::priority> poly_high(high);
169 execution::execute(prefer_low, []{ std::cout << "1\n"; });
170 execution::execute(low, []{ std::cout << "11\n"; });
171 execution::execute(low, []{ std::cout << "111\n"; });
172 execution::execute(med, []{ std::cout << "2\n"; });
173 execution::execute(med, []{ std::cout << "22\n"; });
174 execution::execute(high, []{ std::cout << "3\n"; });
175 execution::execute(high, []{ std::cout << "33\n"; });
176 execution::execute(high, []{ std::cout << "333\n"; });
177 execution::execute(poly_high, []{ std::cout << "3333\n"; });
178 execution::execute(boost::asio::require(ex, custom_props::priority{-1}), [&]{ sched.stop(); });
179 sched.run();
180 std::cout << "polymorphic query result = " << boost::asio::query(poly_high, custom_props::priority{}) << "\n";
181 }
182