• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include <boost/asio/any_io_executor.hpp>
2 #include <boost/asio/defer.hpp>
3 #include <boost/asio/post.hpp>
4 #include <boost/asio/strand.hpp>
5 #include <boost/asio/system_executor.hpp>
6 #include <condition_variable>
7 #include <deque>
8 #include <memory>
9 #include <mutex>
10 #include <typeinfo>
11 #include <vector>
12 
13 using boost::asio::any_io_executor;
14 using boost::asio::defer;
15 using boost::asio::post;
16 using boost::asio::strand;
17 using boost::asio::system_executor;
18 
19 //------------------------------------------------------------------------------
20 // A tiny actor framework
21 // ~~~~~~~~~~~~~~~~~~~~~~
22 
23 class actor;
24 
25 // Used to identify the sender and recipient of messages.
26 typedef actor* actor_address;
27 
28 // Base class for all registered message handlers.
29 class message_handler_base
30 {
31 public:
~message_handler_base()32   virtual ~message_handler_base() {}
33 
34   // Used to determine which message handlers receive an incoming message.
35   virtual const std::type_info& message_id() const = 0;
36 };
37 
38 // Base class for a handler for a specific message type.
39 template <class Message>
40 class message_handler : public message_handler_base
41 {
42 public:
43   // Handle an incoming message.
44   virtual void handle_message(Message msg, actor_address from) = 0;
45 };
46 
47 // Concrete message handler for a specific message type.
48 template <class Actor, class Message>
49 class mf_message_handler : public message_handler<Message>
50 {
51 public:
52   // Construct a message handler to invoke the specified member function.
mf_message_handler(void (Actor::* mf)(Message,actor_address),Actor * a)53   mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
54     : function_(mf), actor_(a)
55   {
56   }
57 
58   // Used to determine which message handlers receive an incoming message.
message_id() const59   virtual const std::type_info& message_id() const
60   {
61     return typeid(Message);
62   }
63 
64   // Handle an incoming message.
handle_message(Message msg,actor_address from)65   virtual void handle_message(Message msg, actor_address from)
66   {
67     (actor_->*function_)(std::move(msg), from);
68   }
69 
70   // Determine whether the message handler represents the specified function.
is_function(void (Actor::* mf)(Message,actor_address)) const71   bool is_function(void (Actor::* mf)(Message, actor_address)) const
72   {
73     return mf == function_;
74   }
75 
76 private:
77   void (Actor::* function_)(Message, actor_address);
78   Actor* actor_;
79 };
80 
81 // Base class for all actors.
82 class actor
83 {
84 public:
~actor()85   virtual ~actor()
86   {
87   }
88 
89   // Obtain the actor's address for use as a message sender or recipient.
address()90   actor_address address()
91   {
92     return this;
93   }
94 
95   // Send a message from one actor to another.
96   template <class Message>
send(Message msg,actor_address from,actor_address to)97   friend void send(Message msg, actor_address from, actor_address to)
98   {
99     // Execute the message handler in the context of the target's executor.
100     post(to->executor_,
101       [=, msg=std::move(msg)]() mutable
102       {
103         to->call_handler(std::move(msg), from);
104       });
105   }
106 
107 protected:
108   // Construct the actor to use the specified executor for all message handlers.
actor(any_io_executor e)109   actor(any_io_executor e)
110     : executor_(std::move(e))
111   {
112   }
113 
114   // Register a handler for a specific message type. Duplicates are permitted.
115   template <class Actor, class Message>
register_handler(void (Actor::* mf)(Message,actor_address))116   void register_handler(void (Actor::* mf)(Message, actor_address))
117   {
118     handlers_.push_back(
119       std::make_shared<mf_message_handler<Actor, Message>>(
120         mf, static_cast<Actor*>(this)));
121   }
122 
123   // Deregister a handler. Removes only the first matching handler.
124   template <class Actor, class Message>
deregister_handler(void (Actor::* mf)(Message,actor_address))125   void deregister_handler(void (Actor::* mf)(Message, actor_address))
126   {
127     const std::type_info& id = typeid(Message);
128     for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
129     {
130       if ((*iter)->message_id() == id)
131       {
132         auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
133         if (mh->is_function(mf))
134         {
135           handlers_.erase(iter);
136           return;
137         }
138       }
139     }
140   }
141 
142   // Send a message from within a message handler.
143   template <class Message>
tail_send(Message msg,actor_address to)144   void tail_send(Message msg, actor_address to)
145   {
146     // Execute the message handler in the context of the target's executor.
147     defer(to->executor_,
148       [=, msg=std::move(msg), from=this]
149       {
150         to->call_handler(std::move(msg), from);
151       });
152   }
153 
154 private:
155   // Find the matching message handlers, if any, and call them.
156   template <class Message>
call_handler(Message msg,actor_address from)157   void call_handler(Message msg, actor_address from)
158   {
159     const std::type_info& message_id = typeid(Message);
160     for (auto& h: handlers_)
161     {
162       if (h->message_id() == message_id)
163       {
164         auto mh = static_cast<message_handler<Message>*>(h.get());
165         mh->handle_message(msg, from);
166       }
167     }
168   }
169 
170   // All messages associated with a single actor object should be processed
171   // non-concurrently. We use a strand to ensure non-concurrent execution even
172   // if the underlying executor may use multiple threads.
173   strand<any_io_executor> executor_;
174 
175   std::vector<std::shared_ptr<message_handler_base>> handlers_;
176 };
177 
178 // A concrete actor that allows synchronous message retrieval.
179 template <class Message>
180 class receiver : public actor
181 {
182 public:
receiver()183   receiver()
184     : actor(system_executor())
185   {
186     register_handler(&receiver::message_handler);
187   }
188 
189   // Block until a message has been received.
wait()190   Message wait()
191   {
192     std::unique_lock<std::mutex> lock(mutex_);
193     condition_.wait(lock, [this]{ return !message_queue_.empty(); });
194     Message msg(std::move(message_queue_.front()));
195     message_queue_.pop_front();
196     return msg;
197   }
198 
199 private:
200   // Handle a new message by adding it to the queue and waking a waiter.
message_handler(Message msg,actor_address)201   void message_handler(Message msg, actor_address /* from */)
202   {
203     std::lock_guard<std::mutex> lock(mutex_);
204     message_queue_.push_back(std::move(msg));
205     condition_.notify_one();
206   }
207 
208   std::mutex mutex_;
209   std::condition_variable condition_;
210   std::deque<Message> message_queue_;
211 };
212 
213 //------------------------------------------------------------------------------
214 
215 #include <boost/asio/thread_pool.hpp>
216 #include <iostream>
217 
218 using boost::asio::thread_pool;
219 
220 class member : public actor
221 {
222 public:
member(any_io_executor e)223   explicit member(any_io_executor e)
224     : actor(std::move(e))
225   {
226     register_handler(&member::init_handler);
227   }
228 
229 private:
init_handler(actor_address next,actor_address from)230   void init_handler(actor_address next, actor_address from)
231   {
232     next_ = next;
233     caller_ = from;
234 
235     register_handler(&member::token_handler);
236     deregister_handler(&member::init_handler);
237   }
238 
token_handler(int token,actor_address)239   void token_handler(int token, actor_address /*from*/)
240   {
241     int msg(token);
242     actor_address to(caller_);
243 
244     if (token > 0)
245     {
246       msg = token - 1;
247       to = next_;
248     }
249 
250     tail_send(msg, to);
251   }
252 
253   actor_address next_;
254   actor_address caller_;
255 };
256 
main()257 int main()
258 {
259   const std::size_t num_threads = 16;
260   const int num_hops = 50000000;
261   const std::size_t num_actors = 503;
262   const int token_value = (num_hops + num_actors - 1) / num_actors;
263   const std::size_t actors_per_thread = num_actors / num_threads;
264 
265   struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
266   single_thread_pool pools[num_threads];
267   std::vector<std::shared_ptr<member>> members(num_actors);
268   receiver<int> rcvr;
269 
270   // Create the member actors.
271   for (std::size_t i = 0; i < num_actors; ++i)
272     members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
273 
274   // Initialise the actors by passing each one the address of the next actor in the ring.
275   for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
276     send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
277 
278   // Send exactly one token to each actor, all with the same initial value, rounding up if required.
279   for (std::size_t i = 0; i < num_actors; ++i)
280     send(token_value, rcvr.address(), members[i]->address());
281 
282   // Wait for all signal messages, indicating the tokens have all reached zero.
283   for (std::size_t i = 0; i < num_actors; ++i)
284     rcvr.wait();
285 }
286