• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include <boost/asio/dispatch.hpp>
2 #include <boost/asio/execution_context.hpp>
3 #include <condition_variable>
4 #include <iostream>
5 #include <memory>
6 #include <mutex>
7 #include <queue>
8 
9 using boost::asio::dispatch;
10 using boost::asio::execution_context;
11 namespace execution = boost::asio::execution;
12 
13 class priority_scheduler : public execution_context
14 {
15 public:
16   // A class that satisfies the Executor requirements.
17   class executor_type
18   {
19   public:
executor_type(priority_scheduler & ctx,int pri)20     executor_type(priority_scheduler& ctx, int pri) noexcept
21       : context_(ctx), priority_(pri)
22     {
23     }
24 
query(execution::context_t) const25     priority_scheduler& query(execution::context_t) const noexcept
26     {
27       return context_;
28     }
29 
30     template <class Func>
execute(Func f) const31     void execute(Func f) const
32     {
33       auto p(std::make_shared<item<Func>>(priority_, std::move(f)));
34       std::lock_guard<std::mutex> lock(context_.mutex_);
35       context_.queue_.push(p);
36       context_.condition_.notify_one();
37     }
38 
operator ==(const executor_type & a,const executor_type & b)39     friend bool operator==(const executor_type& a,
40         const executor_type& b) noexcept
41     {
42       return &a.context_ == &b.context_;
43     }
44 
operator !=(const executor_type & a,const executor_type & b)45     friend bool operator!=(const executor_type& a,
46         const executor_type& b) noexcept
47     {
48       return &a.context_ != &b.context_;
49     }
50 
51   private:
52     priority_scheduler& context_;
53     int priority_;
54   };
55 
~priority_scheduler()56   ~priority_scheduler() noexcept
57   {
58     shutdown();
59     destroy();
60   }
61 
get_executor(int pri=0)62   executor_type get_executor(int pri = 0) noexcept
63   {
64     return executor_type(*const_cast<priority_scheduler*>(this), pri);
65   }
66 
run()67   void run()
68   {
69     std::unique_lock<std::mutex> lock(mutex_);
70     for (;;)
71     {
72       condition_.wait(lock, [&]{ return stopped_ || !queue_.empty(); });
73       if (stopped_)
74         return;
75       auto p(queue_.top());
76       queue_.pop();
77       lock.unlock();
78       p->execute_(p);
79       lock.lock();
80     }
81   }
82 
stop()83   void stop()
84   {
85     std::lock_guard<std::mutex> lock(mutex_);
86     stopped_ = true;
87     condition_.notify_all();
88   }
89 
90 private:
91   struct item_base
92   {
93     int priority_;
94     void (*execute_)(std::shared_ptr<item_base>&);
95   };
96 
97   template <class Func>
98   struct item : item_base
99   {
itempriority_scheduler::item100     item(int pri, Func f) : function_(std::move(f))
101     {
102       priority_ = pri;
103       execute_ = [](std::shared_ptr<item_base>& p)
104       {
105         Func tmp(std::move(static_cast<item*>(p.get())->function_));
106         p.reset();
107         tmp();
108       };
109     }
110 
111     Func function_;
112   };
113 
114   struct item_comp
115   {
operator ()priority_scheduler::item_comp116     bool operator()(
117         const std::shared_ptr<item_base>& a,
118         const std::shared_ptr<item_base>& b)
119     {
120       return a->priority_ < b->priority_;
121     }
122   };
123 
124   std::mutex mutex_;
125   std::condition_variable condition_;
126   std::priority_queue<
127     std::shared_ptr<item_base>,
128     std::vector<std::shared_ptr<item_base>>,
129     item_comp> queue_;
130   bool stopped_ = false;
131 };
132 
main()133 int main()
134 {
135   priority_scheduler sched;
136   auto low = sched.get_executor(0);
137   auto med = sched.get_executor(1);
138   auto high = sched.get_executor(2);
139   dispatch(low, []{ std::cout << "1\n"; });
140   dispatch(low, []{ std::cout << "11\n"; });
141   dispatch(med, []{ std::cout << "2\n"; });
142   dispatch(med, []{ std::cout << "22\n"; });
143   dispatch(high, []{ std::cout << "3\n"; });
144   dispatch(high, []{ std::cout << "33\n"; });
145   dispatch(high, []{ std::cout << "333\n"; });
146   dispatch(sched.get_executor(-1), [&]{ sched.stop(); });
147   sched.run();
148 }
149