• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //          Copyright Oliver Kowalke 2015.
2 // Distributed under the Boost Software License, Version 1.0.
3 //    (See accompanying file LICENSE_1_0.txt or copy at
4 //          http://www.boost.org/LICENSE_1_0.txt)
5 
6 #include <cstddef>
7 #include <cstdlib>
8 #include <map>
9 #include <memory>
10 #include <set>
11 #include <iostream>
12 #include <string>
13 
14 #include <boost/asio.hpp>
15 #include <boost/utility.hpp>
16 
17 #include <boost/fiber/all.hpp>
18 #include "../round_robin.hpp"
19 #include "../yield.hpp"
20 
21 using boost::asio::ip::tcp;
22 
23 const std::size_t max_length = 1024;
24 
25 class subscriber_session;
26 typedef std::shared_ptr< subscriber_session > subscriber_session_ptr;
27 
28 // a queue has n subscribers (subscriptions)
29 // this class holds a list of subcribers for one queue
30 class subscriptions {
31 public:
32     ~subscriptions();
33 
34     // subscribe to this queue
subscribe(subscriber_session_ptr const & s)35     void subscribe( subscriber_session_ptr const& s) {
36         subscribers_.insert( s);
37     }
38 
39     // unsubscribe from this queue
unsubscribe(subscriber_session_ptr const & s)40     void unsubscribe( subscriber_session_ptr const& s) {
41         subscribers_.erase(s);
42     }
43 
44     // publish a message, e.g. push this message to all subscribers
45     void publish( std::string const& msg);
46 
47 private:
48     // list of subscribers
49     std::set< subscriber_session_ptr >  subscribers_;
50 };
51 
52 // a class to register queues and to subsribe clients to this queues
53 class registry : private boost::noncopyable {
54 private:
55     typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont;
56     typedef queues_cont::iterator queues_iter;
57 
58     boost::fibers::mutex    mtx_;
59     queues_cont           queues_;
60 
register_queue_(std::string const & queue)61     void register_queue_( std::string const& queue) {
62         if ( queues_.end() != queues_.find( queue) ) {
63             throw std::runtime_error("queue already exists");
64         }
65         queues_[queue] = std::make_shared< subscriptions >();
66         std::cout << "new queue '" << queue << "' registered" << std::endl;
67     }
68 
unregister_queue_(std::string const & queue)69     void unregister_queue_( std::string const& queue) {
70         queues_.erase( queue);
71         std::cout << "queue '" << queue << "' unregistered" << std::endl;
72     }
73 
subscribe_(std::string const & queue,subscriber_session_ptr s)74     void subscribe_( std::string const& queue, subscriber_session_ptr s) {
75         queues_iter iter = queues_.find( queue);
76         if ( queues_.end() == iter ) {
77             throw std::runtime_error("queue does not exist");
78         }
79         iter->second->subscribe( s);
80         std::cout << "new subscription to queue '" << queue << "'" << std::endl;
81     }
82 
unsubscribe_(std::string const & queue,subscriber_session_ptr s)83     void unsubscribe_( std::string const& queue, subscriber_session_ptr s) {
84         queues_iter iter = queues_.find( queue);
85         if ( queues_.end() != iter ) {
86             iter->second->unsubscribe( s);
87         }
88     }
89 
publish_(std::string const & queue,std::string const & msg)90     void publish_( std::string const& queue, std::string const& msg) {
91         queues_iter iter = queues_.find( queue);
92         if ( queues_.end() == iter ) {
93             throw std::runtime_error("queue does not exist");
94         }
95         iter->second->publish( msg);
96         std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl;
97     }
98 
99 public:
100     // add a queue to registry
register_queue(std::string const & queue)101     void register_queue( std::string const& queue) {
102         std::unique_lock< boost::fibers::mutex > lk( mtx_);
103         register_queue_( queue);
104     }
105 
106     // remove a queue from registry
unregister_queue(std::string const & queue)107     void unregister_queue( std::string const& queue) {
108         std::unique_lock< boost::fibers::mutex > lk( mtx_);
109         unregister_queue_( queue);
110     }
111 
112     // subscribe to a queue
subscribe(std::string const & queue,subscriber_session_ptr s)113     void subscribe( std::string const& queue, subscriber_session_ptr s) {
114         std::unique_lock< boost::fibers::mutex > lk( mtx_);
115         subscribe_( queue, s);
116     }
117 
118     // unsubscribe from a queue
unsubscribe(std::string const & queue,subscriber_session_ptr s)119     void unsubscribe( std::string const& queue, subscriber_session_ptr s) {
120         std::unique_lock< boost::fibers::mutex > lk( mtx_);
121         unsubscribe_( queue, s);
122     }
123 
124     // publish a message to all subscribers registerd to the queue
publish(std::string const & queue,std::string const & msg)125     void publish( std::string const& queue, std::string const& msg) {
126         std::unique_lock< boost::fibers::mutex > lk( mtx_);
127         publish_( queue, msg);
128     }
129 };
130 
131 // a subscriber subscribes to a given queue in order to receive messages published on this queue
132 class subscriber_session : public std::enable_shared_from_this< subscriber_session > {
133 public:
subscriber_session(std::shared_ptr<boost::asio::io_context> const & io_context,registry & reg)134     explicit subscriber_session( std::shared_ptr< boost::asio::io_context > const& io_context, registry & reg) :
135         socket_( * io_context),
136         reg_( reg) {
137     }
138 
socket()139     tcp::socket& socket() {
140         return socket_;
141     }
142 
143     // this function is executed inside the fiber
run()144     void run() {
145         std::string queue;
146         try {
147             boost::system::error_code ec;
148             // read first message == queue name
149             // async_ready() returns if the the complete message is read
150             // until this the fiber is suspended until the complete message
151             // is read int the given buffer 'data'
152             boost::asio::async_read(
153                     socket_,
154                     boost::asio::buffer( data_),
155                     boost::fibers::asio::yield[ec]);
156             if ( ec) {
157                 throw std::runtime_error("no queue from subscriber");
158             }
159             // first message ist equal to the queue name the publisher
160             // publishes to
161             queue = data_;
162             // subscribe to new queue
163             reg_.subscribe( queue, shared_from_this() );
164             // read published messages
165             for (;;) {
166                 // wait for a conditon-variable for new messages
167                 // the fiber will be suspended until the condtion
168                 // becomes true and the fiber is resumed
169                 // published message is stored in buffer 'data_'
170                 std::unique_lock< boost::fibers::mutex > lk( mtx_);
171                 cond_.wait( lk);
172                 std::string data( data_);
173                 lk.unlock();
174                 // message '<fini>' terminates subscription
175                 if ( "<fini>" == data) {
176                     break;
177                 }
178                 // async. write message to socket connected with
179                 // subscriber
180                 // async_write() returns if the complete message was writen
181                 // the fiber is suspended in the meanwhile
182                 boost::asio::async_write(
183                         socket_,
184                         boost::asio::buffer( data, data.size() ),
185                         boost::fibers::asio::yield[ec]);
186                 if ( ec == boost::asio::error::eof) {
187                     break; //connection closed cleanly by peer
188                 } else if ( ec) {
189                     throw boost::system::system_error( ec); //some other error
190                 }
191                 std::cout << "subscriber::run(): '" << data << "' written" << std::endl;
192             }
193         } catch ( std::exception const& e) {
194             std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl;
195         }
196         // close socket
197         socket_.close();
198         // unregister queue
199         reg_.unsubscribe( queue, shared_from_this() );
200     }
201 
202     // called from publisher_session (running in other fiber)
publish(std::string const & msg)203     void publish( std::string const& msg) {
204         std::unique_lock< boost::fibers::mutex > lk( mtx_);
205         std::memset( data_, '\0', sizeof( data_));
206         std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size()));
207         cond_.notify_one();
208     }
209 
210 private:
211     tcp::socket                         socket_;
212     registry                        &   reg_;
213     boost::fibers::mutex                mtx_;
214     boost::fibers::condition_variable   cond_;
215     // fixed size message
216     char                                data_[max_length];
217 };
218 
219 
~subscriptions()220 subscriptions::~subscriptions() {
221     for ( subscriber_session_ptr s : subscribers_) {
222         s->publish("<fini>");
223     }
224 }
225 
226 void
publish(std::string const & msg)227 subscriptions::publish( std::string const& msg) {
228     for ( subscriber_session_ptr s : subscribers_) {
229         s->publish( msg);
230     }
231 }
232 
233 // a publisher publishes messages on its queue
234 // subscriber might register to this queue to get the published messages
235 class publisher_session : public std::enable_shared_from_this< publisher_session > {
236 public:
publisher_session(std::shared_ptr<boost::asio::io_context> const & io_context,registry & reg)237     explicit publisher_session( std::shared_ptr< boost::asio::io_context > const& io_context, registry & reg) :
238         socket_( * io_context),
239         reg_( reg) {
240     }
241 
socket()242     tcp::socket& socket() {
243         return socket_;
244     }
245 
246     // this function is executed inside the fiber
run()247     void run() {
248         std::string queue;
249         try {
250             boost::system::error_code ec;
251             // fixed size message
252             char data[max_length];
253             // read first message == queue name
254             // async_ready() returns if the the complete message is read
255             // until this the fiber is suspended until the complete message
256             // is read int the given buffer 'data'
257             boost::asio::async_read(
258                     socket_,
259                     boost::asio::buffer( data),
260                     boost::fibers::asio::yield[ec]);
261             if ( ec) {
262                 throw std::runtime_error("no queue from publisher");
263             }
264             // first message ist equal to the queue name the publisher
265             // publishes to
266             queue = data;
267             // register the new queue
268             reg_.register_queue( queue);
269             // start publishing messages
270             for (;;) {
271                 // read message from publisher asyncronous
272                 // async_read() suspends this fiber until the complete emssage is read
273                 // and stored in the given buffer 'data'
274                 boost::asio::async_read(
275                         socket_,
276                         boost::asio::buffer( data),
277                         boost::fibers::asio::yield[ec]);
278                 if ( ec == boost::asio::error::eof) {
279                     break; //connection closed cleanly by peer
280                 } else if ( ec) {
281                     throw boost::system::system_error( ec); //some other error
282                 }
283                 // publish message to all subscribers
284                 reg_.publish( queue, std::string( data) );
285             }
286         } catch ( std::exception const& e) {
287             std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl;
288         }
289         // close socket
290         socket_.close();
291         // unregister queue
292         reg_.unregister_queue( queue);
293     }
294 
295 private:
296     tcp::socket         socket_;
297     registry        &   reg_;
298 };
299 
300 typedef std::shared_ptr< publisher_session > publisher_session_ptr;
301 
302 // function accepts connections requests from clients acting as a publisher
accept_publisher(std::shared_ptr<boost::asio::io_context> const & io_context,unsigned short port,registry & reg)303 void accept_publisher( std::shared_ptr< boost::asio::io_context > const& io_context,
304                        unsigned short port,
305                        registry & reg) {
306     // create TCP-acceptor
307     tcp::acceptor acceptor( * io_context, tcp::endpoint( tcp::v4(), port) );
308     // loop for accepting connection requests
309     for (;;) {
310         boost::system::error_code ec;
311         // create new publisher-session
312         // this instance will be associated with one publisher
313         publisher_session_ptr new_publisher_session =
314             std::make_shared< publisher_session >( io_context, std::ref( reg) );
315         // async. accept of new connection request
316         // this function will suspend this execution context (fiber) until a
317         // connection was established, after returning from this function a new client (publisher)
318         // is connected
319         acceptor.async_accept(
320                 new_publisher_session->socket(),
321                 boost::fibers::asio::yield[ec]);
322         if ( ! ec) {
323             // run the new publisher in its own fiber (one fiber for one client)
324             boost::fibers::fiber(
325                 std::bind( & publisher_session::run, new_publisher_session) ).detach();
326         }
327     }
328 }
329 
330 // function accepts connections requests from clients acting as a subscriber
accept_subscriber(std::shared_ptr<boost::asio::io_context> const & io_context,unsigned short port,registry & reg)331 void accept_subscriber( std::shared_ptr< boost::asio::io_context > const& io_context,
332                         unsigned short port,
333                         registry & reg) {
334     // create TCP-acceptor
335     tcp::acceptor acceptor( * io_context, tcp::endpoint( tcp::v4(), port) );
336     // loop for accepting connection requests
337     for (;;) {
338         boost::system::error_code ec;
339         // create new subscriber-session
340         // this instance will be associated with one subscriber
341         subscriber_session_ptr new_subscriber_session =
342             std::make_shared< subscriber_session >( io_context, std::ref( reg) );
343         // async. accept of new connection request
344         // this function will suspend this execution context (fiber) until a
345         // connection was established, after returning from this function a new client (subscriber)
346         // is connected
347         acceptor.async_accept(
348                 new_subscriber_session->socket(),
349                 boost::fibers::asio::yield[ec]);
350         if ( ! ec) {
351             // run the new subscriber in its own fiber (one fiber for one client)
352             boost::fibers::fiber(
353                 std::bind( & subscriber_session::run, new_subscriber_session) ).detach();
354         }
355     }
356 }
357 
358 
main(int argc,char * argv[])359 int main( int argc, char* argv[]) {
360     try {
361         // create io_context for async. I/O
362         std::shared_ptr< boost::asio::io_context > io_context = std::make_shared< boost::asio::io_context >();
363         // register asio scheduler
364         boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_context);
365         // registry for queues and its subscription
366         registry reg;
367         // create an acceptor for publishers, run it as fiber
368         boost::fibers::fiber(
369             accept_publisher, std::ref( io_context), 9997, std::ref( reg) ).detach();
370         // create an acceptor for subscribers, run it as fiber
371         boost::fibers::fiber(
372             accept_subscriber, std::ref( io_context), 9998, std::ref( reg) ).detach();
373         // dispatch
374         io_context->run();
375         return EXIT_SUCCESS;
376     } catch ( std::exception const& e) {
377         std::cerr << "Exception: " << e.what() << "\n";
378     }
379 
380     return EXIT_FAILURE;
381 }
382