1 //
2 // chat_server.cpp
3 // ~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <cstdlib>
12 #include <deque>
13 #include <iostream>
14 #include <list>
15 #include <memory>
16 #include <set>
17 #include <utility>
18 #include <boost/asio.hpp>
19 #include "chat_message.hpp"
20
21 using boost::asio::ip::tcp;
22
23 //----------------------------------------------------------------------
24
25 typedef std::deque<chat_message> chat_message_queue;
26
27 //----------------------------------------------------------------------
28
29 class chat_participant
30 {
31 public:
~chat_participant()32 virtual ~chat_participant() {}
33 virtual void deliver(const chat_message& msg) = 0;
34 };
35
36 typedef std::shared_ptr<chat_participant> chat_participant_ptr;
37
38 //----------------------------------------------------------------------
39
40 class chat_room
41 {
42 public:
join(chat_participant_ptr participant)43 void join(chat_participant_ptr participant)
44 {
45 participants_.insert(participant);
46 for (auto msg: recent_msgs_)
47 participant->deliver(msg);
48 }
49
leave(chat_participant_ptr participant)50 void leave(chat_participant_ptr participant)
51 {
52 participants_.erase(participant);
53 }
54
deliver(const chat_message & msg)55 void deliver(const chat_message& msg)
56 {
57 recent_msgs_.push_back(msg);
58 while (recent_msgs_.size() > max_recent_msgs)
59 recent_msgs_.pop_front();
60
61 for (auto participant: participants_)
62 participant->deliver(msg);
63 }
64
65 private:
66 std::set<chat_participant_ptr> participants_;
67 enum { max_recent_msgs = 100 };
68 chat_message_queue recent_msgs_;
69 };
70
71 //----------------------------------------------------------------------
72
73 class chat_session
74 : public chat_participant,
75 public std::enable_shared_from_this<chat_session>
76 {
77 public:
chat_session(tcp::socket socket,chat_room & room)78 chat_session(tcp::socket socket, chat_room& room)
79 : socket_(std::move(socket)),
80 room_(room)
81 {
82 }
83
start()84 void start()
85 {
86 room_.join(shared_from_this());
87 do_read_header();
88 }
89
deliver(const chat_message & msg)90 void deliver(const chat_message& msg)
91 {
92 bool write_in_progress = !write_msgs_.empty();
93 write_msgs_.push_back(msg);
94 if (!write_in_progress)
95 {
96 do_write();
97 }
98 }
99
100 private:
do_read_header()101 void do_read_header()
102 {
103 auto self(shared_from_this());
104 boost::asio::async_read(socket_,
105 boost::asio::buffer(read_msg_.data(), chat_message::header_length),
106 [this, self](boost::system::error_code ec, std::size_t /*length*/)
107 {
108 if (!ec && read_msg_.decode_header())
109 {
110 do_read_body();
111 }
112 else
113 {
114 room_.leave(shared_from_this());
115 }
116 });
117 }
118
do_read_body()119 void do_read_body()
120 {
121 auto self(shared_from_this());
122 boost::asio::async_read(socket_,
123 boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
124 [this, self](boost::system::error_code ec, std::size_t /*length*/)
125 {
126 if (!ec)
127 {
128 room_.deliver(read_msg_);
129 do_read_header();
130 }
131 else
132 {
133 room_.leave(shared_from_this());
134 }
135 });
136 }
137
do_write()138 void do_write()
139 {
140 auto self(shared_from_this());
141 boost::asio::async_write(socket_,
142 boost::asio::buffer(write_msgs_.front().data(),
143 write_msgs_.front().length()),
144 [this, self](boost::system::error_code ec, std::size_t /*length*/)
145 {
146 if (!ec)
147 {
148 write_msgs_.pop_front();
149 if (!write_msgs_.empty())
150 {
151 do_write();
152 }
153 }
154 else
155 {
156 room_.leave(shared_from_this());
157 }
158 });
159 }
160
161 tcp::socket socket_;
162 chat_room& room_;
163 chat_message read_msg_;
164 chat_message_queue write_msgs_;
165 };
166
167 //----------------------------------------------------------------------
168
169 class chat_server
170 {
171 public:
chat_server(boost::asio::io_context & io_context,const tcp::endpoint & endpoint)172 chat_server(boost::asio::io_context& io_context,
173 const tcp::endpoint& endpoint)
174 : acceptor_(io_context, endpoint)
175 {
176 do_accept();
177 }
178
179 private:
do_accept()180 void do_accept()
181 {
182 acceptor_.async_accept(
183 [this](boost::system::error_code ec, tcp::socket socket)
184 {
185 if (!ec)
186 {
187 std::make_shared<chat_session>(std::move(socket), room_)->start();
188 }
189
190 do_accept();
191 });
192 }
193
194 tcp::acceptor acceptor_;
195 chat_room room_;
196 };
197
198 //----------------------------------------------------------------------
199
main(int argc,char * argv[])200 int main(int argc, char* argv[])
201 {
202 try
203 {
204 if (argc < 2)
205 {
206 std::cerr << "Usage: chat_server <port> [<port> ...]\n";
207 return 1;
208 }
209
210 boost::asio::io_context io_context;
211
212 std::list<chat_server> servers;
213 for (int i = 1; i < argc; ++i)
214 {
215 tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i]));
216 servers.emplace_back(io_context, endpoint);
217 }
218
219 io_context.run();
220 }
221 catch (std::exception& e)
222 {
223 std::cerr << "Exception: " << e.what() << "\n";
224 }
225
226 return 0;
227 }
228