• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include <boost/asio/execution.hpp>
2 #include <condition_variable>
3 #include <iostream>
4 #include <memory>
5 #include <mutex>
6 #include <queue>
7 
8 namespace execution = boost::asio::execution;
9 
10 namespace custom_props {
11 
12   struct priority
13   {
14     template <typename T>
15     static constexpr bool is_applicable_property_v =
16       execution::is_executor<T>::value;
17 
18     static constexpr bool is_requirable = true;
19     static constexpr bool is_preferable = true;
20 
21     using polymorphic_query_result_type = int;
22 
valuecustom_props::priority23     int value() const { return value_; }
24 
25     int value_ = 1;
26   };
27 
28   constexpr priority low_priority{0};
29   constexpr priority normal_priority{1};
30   constexpr priority high_priority{2};
31 
32 } // namespace custom_props
33 
34 class priority_scheduler
35 {
36 public:
37   // A class that satisfies the Executor requirements.
38   class executor_type
39   {
40   public:
executor_type(priority_scheduler & ctx)41     executor_type(priority_scheduler& ctx) noexcept
42       : context_(ctx), priority_(custom_props::normal_priority.value())
43     {
44     }
45 
query(execution::context_t) const46     priority_scheduler& query(execution::context_t) const noexcept
47     {
48       return context_;
49     }
50 
query(custom_props::priority) const51     int query(custom_props::priority) const noexcept
52     {
53       return priority_;
54     }
55 
require(custom_props::priority pri) const56     executor_type require(custom_props::priority pri) const
57     {
58       executor_type new_ex(*this);
59       new_ex.priority_ = pri.value();
60       return new_ex;
61     }
62 
63     template <class Func>
execute(Func f) const64     void execute(Func f) const
65     {
66       auto p(std::make_shared<item<Func>>(priority_, std::move(f)));
67       std::lock_guard<std::mutex> lock(context_.mutex_);
68       context_.queue_.push(p);
69       context_.condition_.notify_one();
70     }
71 
operator ==(const executor_type & a,const executor_type & b)72     friend bool operator==(const executor_type& a,
73         const executor_type& b) noexcept
74     {
75       return &a.context_ == &b.context_;
76     }
77 
operator !=(const executor_type & a,const executor_type & b)78     friend bool operator!=(const executor_type& a,
79         const executor_type& b) noexcept
80     {
81       return &a.context_ != &b.context_;
82     }
83 
84   private:
85     priority_scheduler& context_;
86     int priority_;
87   };
88 
executor()89   executor_type executor() noexcept
90   {
91     return executor_type(*const_cast<priority_scheduler*>(this));
92   }
93 
run()94   void run()
95   {
96     std::unique_lock<std::mutex> lock(mutex_);
97     for (;;)
98     {
99       condition_.wait(lock, [&]{ return stopped_ || !queue_.empty(); });
100       if (stopped_)
101         return;
102       auto p(queue_.top());
103       queue_.pop();
104       lock.unlock();
105       p->execute_(p);
106       lock.lock();
107     }
108   }
109 
stop()110   void stop()
111   {
112     std::lock_guard<std::mutex> lock(mutex_);
113     stopped_ = true;
114     condition_.notify_all();
115   }
116 
117 private:
118   struct item_base
119   {
120     int priority_;
121     void (*execute_)(std::shared_ptr<item_base>&);
122   };
123 
124   template <class Func>
125   struct item : item_base
126   {
itempriority_scheduler::item127     item(int pri, Func f) : function_(std::move(f))
128     {
129       priority_ = pri;
130       execute_ = [](std::shared_ptr<item_base>& p)
131       {
132         Func tmp(std::move(static_cast<item*>(p.get())->function_));
133         p.reset();
134         tmp();
135       };
136     }
137 
138     Func function_;
139   };
140 
141   struct item_comp
142   {
operator ()priority_scheduler::item_comp143     bool operator()(
144         const std::shared_ptr<item_base>& a,
145         const std::shared_ptr<item_base>& b)
146     {
147       return a->priority_ < b->priority_;
148     }
149   };
150 
151   std::mutex mutex_;
152   std::condition_variable condition_;
153   std::priority_queue<
154     std::shared_ptr<item_base>,
155     std::vector<std::shared_ptr<item_base>>,
156     item_comp> queue_;
157   bool stopped_ = false;
158 };
159 
main()160 int main()
161 {
162   priority_scheduler sched;
163   auto ex = sched.executor();
164   auto prefer_low = boost::asio::prefer(ex, custom_props::low_priority);
165   auto low = boost::asio::require(ex, custom_props::low_priority);
166   auto med = boost::asio::require(ex, custom_props::normal_priority);
167   auto high = boost::asio::require(ex, custom_props::high_priority);
168   execution::any_executor<custom_props::priority> poly_high(high);
169   execution::execute(prefer_low, []{ std::cout << "1\n"; });
170   execution::execute(low, []{ std::cout << "11\n"; });
171   execution::execute(low, []{ std::cout << "111\n"; });
172   execution::execute(med, []{ std::cout << "2\n"; });
173   execution::execute(med, []{ std::cout << "22\n"; });
174   execution::execute(high, []{ std::cout << "3\n"; });
175   execution::execute(high, []{ std::cout << "33\n"; });
176   execution::execute(high, []{ std::cout << "333\n"; });
177   execution::execute(poly_high, []{ std::cout << "3333\n"; });
178   execution::execute(boost::asio::require(ex, custom_props::priority{-1}), [&]{ sched.stop(); });
179   sched.run();
180   std::cout << "polymorphic query result = " << boost::asio::query(poly_high, custom_props::priority{}) << "\n";
181 }
182