• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // chat_server.cpp
3 // ~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <algorithm>
12 #include <cstdlib>
13 #include <deque>
14 #include <iostream>
15 #include <list>
16 #include <set>
17 #include <boost/bind/bind.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <boost/enable_shared_from_this.hpp>
20 #include <boost/asio.hpp>
21 #include "chat_message.hpp"
22 
23 using boost::asio::ip::tcp;
24 
25 //----------------------------------------------------------------------
26 
27 typedef std::deque<chat_message> chat_message_queue;
28 
29 //----------------------------------------------------------------------
30 
31 class chat_participant
32 {
33 public:
~chat_participant()34   virtual ~chat_participant() {}
35   virtual void deliver(const chat_message& msg) = 0;
36 };
37 
38 typedef boost::shared_ptr<chat_participant> chat_participant_ptr;
39 
40 //----------------------------------------------------------------------
41 
42 class chat_room
43 {
44 public:
join(chat_participant_ptr participant)45   void join(chat_participant_ptr participant)
46   {
47     participants_.insert(participant);
48     std::for_each(recent_msgs_.begin(), recent_msgs_.end(),
49         boost::bind(&chat_participant::deliver,
50           participant, boost::placeholders::_1));
51   }
52 
leave(chat_participant_ptr participant)53   void leave(chat_participant_ptr participant)
54   {
55     participants_.erase(participant);
56   }
57 
deliver(const chat_message & msg)58   void deliver(const chat_message& msg)
59   {
60     recent_msgs_.push_back(msg);
61     while (recent_msgs_.size() > max_recent_msgs)
62       recent_msgs_.pop_front();
63 
64     std::for_each(participants_.begin(), participants_.end(),
65         boost::bind(&chat_participant::deliver,
66           boost::placeholders::_1, boost::ref(msg)));
67   }
68 
69 private:
70   std::set<chat_participant_ptr> participants_;
71   enum { max_recent_msgs = 100 };
72   chat_message_queue recent_msgs_;
73 };
74 
75 //----------------------------------------------------------------------
76 
77 class chat_session
78   : public chat_participant,
79     public boost::enable_shared_from_this<chat_session>
80 {
81 public:
chat_session(boost::asio::io_context & io_context,chat_room & room)82   chat_session(boost::asio::io_context& io_context, chat_room& room)
83     : socket_(io_context),
84       room_(room)
85   {
86   }
87 
socket()88   tcp::socket& socket()
89   {
90     return socket_;
91   }
92 
start()93   void start()
94   {
95     room_.join(shared_from_this());
96     boost::asio::async_read(socket_,
97         boost::asio::buffer(read_msg_.data(), chat_message::header_length),
98         boost::bind(
99           &chat_session::handle_read_header, shared_from_this(),
100           boost::asio::placeholders::error));
101   }
102 
deliver(const chat_message & msg)103   void deliver(const chat_message& msg)
104   {
105     bool write_in_progress = !write_msgs_.empty();
106     write_msgs_.push_back(msg);
107     if (!write_in_progress)
108     {
109       boost::asio::async_write(socket_,
110           boost::asio::buffer(write_msgs_.front().data(),
111             write_msgs_.front().length()),
112           boost::bind(&chat_session::handle_write, shared_from_this(),
113             boost::asio::placeholders::error));
114     }
115   }
116 
handle_read_header(const boost::system::error_code & error)117   void handle_read_header(const boost::system::error_code& error)
118   {
119     if (!error && read_msg_.decode_header())
120     {
121       boost::asio::async_read(socket_,
122           boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
123           boost::bind(&chat_session::handle_read_body, shared_from_this(),
124             boost::asio::placeholders::error));
125     }
126     else
127     {
128       room_.leave(shared_from_this());
129     }
130   }
131 
handle_read_body(const boost::system::error_code & error)132   void handle_read_body(const boost::system::error_code& error)
133   {
134     if (!error)
135     {
136       room_.deliver(read_msg_);
137       boost::asio::async_read(socket_,
138           boost::asio::buffer(read_msg_.data(), chat_message::header_length),
139           boost::bind(&chat_session::handle_read_header, shared_from_this(),
140             boost::asio::placeholders::error));
141     }
142     else
143     {
144       room_.leave(shared_from_this());
145     }
146   }
147 
handle_write(const boost::system::error_code & error)148   void handle_write(const boost::system::error_code& error)
149   {
150     if (!error)
151     {
152       write_msgs_.pop_front();
153       if (!write_msgs_.empty())
154       {
155         boost::asio::async_write(socket_,
156             boost::asio::buffer(write_msgs_.front().data(),
157               write_msgs_.front().length()),
158             boost::bind(&chat_session::handle_write, shared_from_this(),
159               boost::asio::placeholders::error));
160       }
161     }
162     else
163     {
164       room_.leave(shared_from_this());
165     }
166   }
167 
168 private:
169   tcp::socket socket_;
170   chat_room& room_;
171   chat_message read_msg_;
172   chat_message_queue write_msgs_;
173 };
174 
175 typedef boost::shared_ptr<chat_session> chat_session_ptr;
176 
177 //----------------------------------------------------------------------
178 
179 class chat_server
180 {
181 public:
chat_server(boost::asio::io_context & io_context,const tcp::endpoint & endpoint)182   chat_server(boost::asio::io_context& io_context,
183       const tcp::endpoint& endpoint)
184     : io_context_(io_context),
185       acceptor_(io_context, endpoint)
186   {
187     start_accept();
188   }
189 
start_accept()190   void start_accept()
191   {
192     chat_session_ptr new_session(new chat_session(io_context_, room_));
193     acceptor_.async_accept(new_session->socket(),
194         boost::bind(&chat_server::handle_accept, this, new_session,
195           boost::asio::placeholders::error));
196   }
197 
handle_accept(chat_session_ptr session,const boost::system::error_code & error)198   void handle_accept(chat_session_ptr session,
199       const boost::system::error_code& error)
200   {
201     if (!error)
202     {
203       session->start();
204     }
205 
206     start_accept();
207   }
208 
209 private:
210   boost::asio::io_context& io_context_;
211   tcp::acceptor acceptor_;
212   chat_room room_;
213 };
214 
215 typedef boost::shared_ptr<chat_server> chat_server_ptr;
216 typedef std::list<chat_server_ptr> chat_server_list;
217 
218 //----------------------------------------------------------------------
219 
main(int argc,char * argv[])220 int main(int argc, char* argv[])
221 {
222   try
223   {
224     if (argc < 2)
225     {
226       std::cerr << "Usage: chat_server <port> [<port> ...]\n";
227       return 1;
228     }
229 
230     boost::asio::io_context io_context;
231 
232     chat_server_list servers;
233     for (int i = 1; i < argc; ++i)
234     {
235       using namespace std; // For atoi.
236       tcp::endpoint endpoint(tcp::v4(), atoi(argv[i]));
237       chat_server_ptr server(new chat_server(io_context, endpoint));
238       servers.push_back(server);
239     }
240 
241     io_context.run();
242   }
243   catch (std::exception& e)
244   {
245     std::cerr << "Exception: " << e.what() << "\n";
246   }
247 
248   return 0;
249 }
250