• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // chat_server.cpp
3 // ~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <cstdlib>
12 #include <deque>
13 #include <iostream>
14 #include <list>
15 #include <memory>
16 #include <set>
17 #include <string>
18 #include <utility>
19 #include <boost/asio/awaitable.hpp>
20 #include <boost/asio/detached.hpp>
21 #include <boost/asio/co_spawn.hpp>
22 #include <boost/asio/io_context.hpp>
23 #include <boost/asio/ip/tcp.hpp>
24 #include <boost/asio/read_until.hpp>
25 #include <boost/asio/redirect_error.hpp>
26 #include <boost/asio/signal_set.hpp>
27 #include <boost/asio/steady_timer.hpp>
28 #include <boost/asio/use_awaitable.hpp>
29 #include <boost/asio/write.hpp>
30 
31 using boost::asio::ip::tcp;
32 using boost::asio::awaitable;
33 using boost::asio::co_spawn;
34 using boost::asio::detached;
35 using boost::asio::redirect_error;
36 using boost::asio::use_awaitable;
37 
38 //----------------------------------------------------------------------
39 
40 class chat_participant
41 {
42 public:
~chat_participant()43   virtual ~chat_participant() {}
44   virtual void deliver(const std::string& msg) = 0;
45 };
46 
47 typedef std::shared_ptr<chat_participant> chat_participant_ptr;
48 
49 //----------------------------------------------------------------------
50 
51 class chat_room
52 {
53 public:
join(chat_participant_ptr participant)54   void join(chat_participant_ptr participant)
55   {
56     participants_.insert(participant);
57     for (auto msg: recent_msgs_)
58       participant->deliver(msg);
59   }
60 
leave(chat_participant_ptr participant)61   void leave(chat_participant_ptr participant)
62   {
63     participants_.erase(participant);
64   }
65 
deliver(const std::string & msg)66   void deliver(const std::string& msg)
67   {
68     recent_msgs_.push_back(msg);
69     while (recent_msgs_.size() > max_recent_msgs)
70       recent_msgs_.pop_front();
71 
72     for (auto participant: participants_)
73       participant->deliver(msg);
74   }
75 
76 private:
77   std::set<chat_participant_ptr> participants_;
78   enum { max_recent_msgs = 100 };
79   std::deque<std::string> recent_msgs_;
80 };
81 
82 //----------------------------------------------------------------------
83 
84 class chat_session
85   : public chat_participant,
86     public std::enable_shared_from_this<chat_session>
87 {
88 public:
chat_session(tcp::socket socket,chat_room & room)89   chat_session(tcp::socket socket, chat_room& room)
90     : socket_(std::move(socket)),
91       timer_(socket_.get_executor()),
92       room_(room)
93   {
94     timer_.expires_at(std::chrono::steady_clock::time_point::max());
95   }
96 
start()97   void start()
98   {
99     room_.join(shared_from_this());
100 
101     co_spawn(socket_.get_executor(),
102         [self = shared_from_this()]{ return self->reader(); },
103         detached);
104 
105     co_spawn(socket_.get_executor(),
106         [self = shared_from_this()]{ return self->writer(); },
107         detached);
108   }
109 
deliver(const std::string & msg)110   void deliver(const std::string& msg)
111   {
112     write_msgs_.push_back(msg);
113     timer_.cancel_one();
114   }
115 
116 private:
reader()117   awaitable<void> reader()
118   {
119     try
120     {
121       for (std::string read_msg;;)
122       {
123         std::size_t n = co_await boost::asio::async_read_until(socket_,
124             boost::asio::dynamic_buffer(read_msg, 1024), "\n", use_awaitable);
125 
126         room_.deliver(read_msg.substr(0, n));
127         read_msg.erase(0, n);
128       }
129     }
130     catch (std::exception&)
131     {
132       stop();
133     }
134   }
135 
writer()136   awaitable<void> writer()
137   {
138     try
139     {
140       while (socket_.is_open())
141       {
142         if (write_msgs_.empty())
143         {
144           boost::system::error_code ec;
145           co_await timer_.async_wait(redirect_error(use_awaitable, ec));
146         }
147         else
148         {
149           co_await boost::asio::async_write(socket_,
150               boost::asio::buffer(write_msgs_.front()), use_awaitable);
151           write_msgs_.pop_front();
152         }
153       }
154     }
155     catch (std::exception&)
156     {
157       stop();
158     }
159   }
160 
stop()161   void stop()
162   {
163     room_.leave(shared_from_this());
164     socket_.close();
165     timer_.cancel();
166   }
167 
168   tcp::socket socket_;
169   boost::asio::steady_timer timer_;
170   chat_room& room_;
171   std::deque<std::string> write_msgs_;
172 };
173 
174 //----------------------------------------------------------------------
175 
listener(tcp::acceptor acceptor)176 awaitable<void> listener(tcp::acceptor acceptor)
177 {
178   chat_room room;
179 
180   for (;;)
181   {
182     std::make_shared<chat_session>(
183         co_await acceptor.async_accept(use_awaitable),
184         room
185       )->start();
186   }
187 }
188 
189 //----------------------------------------------------------------------
190 
main(int argc,char * argv[])191 int main(int argc, char* argv[])
192 {
193   try
194   {
195     if (argc < 2)
196     {
197       std::cerr << "Usage: chat_server <port> [<port> ...]\n";
198       return 1;
199     }
200 
201     boost::asio::io_context io_context(1);
202 
203     for (int i = 1; i < argc; ++i)
204     {
205       unsigned short port = std::atoi(argv[i]);
206       co_spawn(io_context,
207           listener(tcp::acceptor(io_context, {tcp::v4(), port})),
208           detached);
209     }
210 
211     boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
212     signals.async_wait([&](auto, auto){ io_context.stop(); });
213 
214     io_context.run();
215   }
216   catch (std::exception& e)
217   {
218     std::cerr << "Exception: " << e.what() << "\n";
219   }
220 
221   return 0;
222 }
223