1 //
2 // chat_server.cpp
3 // ~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <algorithm>
12 #include <cstdlib>
13 #include <deque>
14 #include <iostream>
15 #include <list>
16 #include <set>
17 #include <boost/bind/bind.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <boost/enable_shared_from_this.hpp>
20 #include <boost/asio.hpp>
21 #include "chat_message.hpp"
22
23 using boost::asio::ip::tcp;
24
25 //----------------------------------------------------------------------
26
27 typedef std::deque<chat_message> chat_message_queue;
28
29 //----------------------------------------------------------------------
30
31 class chat_participant
32 {
33 public:
~chat_participant()34 virtual ~chat_participant() {}
35 virtual void deliver(const chat_message& msg) = 0;
36 };
37
38 typedef boost::shared_ptr<chat_participant> chat_participant_ptr;
39
40 //----------------------------------------------------------------------
41
42 class chat_room
43 {
44 public:
join(chat_participant_ptr participant)45 void join(chat_participant_ptr participant)
46 {
47 participants_.insert(participant);
48 std::for_each(recent_msgs_.begin(), recent_msgs_.end(),
49 boost::bind(&chat_participant::deliver,
50 participant, boost::placeholders::_1));
51 }
52
leave(chat_participant_ptr participant)53 void leave(chat_participant_ptr participant)
54 {
55 participants_.erase(participant);
56 }
57
deliver(const chat_message & msg)58 void deliver(const chat_message& msg)
59 {
60 recent_msgs_.push_back(msg);
61 while (recent_msgs_.size() > max_recent_msgs)
62 recent_msgs_.pop_front();
63
64 std::for_each(participants_.begin(), participants_.end(),
65 boost::bind(&chat_participant::deliver,
66 boost::placeholders::_1, boost::ref(msg)));
67 }
68
69 private:
70 std::set<chat_participant_ptr> participants_;
71 enum { max_recent_msgs = 100 };
72 chat_message_queue recent_msgs_;
73 };
74
75 //----------------------------------------------------------------------
76
77 class chat_session
78 : public chat_participant,
79 public boost::enable_shared_from_this<chat_session>
80 {
81 public:
chat_session(boost::asio::io_context & io_context,chat_room & room)82 chat_session(boost::asio::io_context& io_context, chat_room& room)
83 : socket_(io_context),
84 room_(room)
85 {
86 }
87
socket()88 tcp::socket& socket()
89 {
90 return socket_;
91 }
92
start()93 void start()
94 {
95 room_.join(shared_from_this());
96 boost::asio::async_read(socket_,
97 boost::asio::buffer(read_msg_.data(), chat_message::header_length),
98 boost::bind(
99 &chat_session::handle_read_header, shared_from_this(),
100 boost::asio::placeholders::error));
101 }
102
deliver(const chat_message & msg)103 void deliver(const chat_message& msg)
104 {
105 bool write_in_progress = !write_msgs_.empty();
106 write_msgs_.push_back(msg);
107 if (!write_in_progress)
108 {
109 boost::asio::async_write(socket_,
110 boost::asio::buffer(write_msgs_.front().data(),
111 write_msgs_.front().length()),
112 boost::bind(&chat_session::handle_write, shared_from_this(),
113 boost::asio::placeholders::error));
114 }
115 }
116
handle_read_header(const boost::system::error_code & error)117 void handle_read_header(const boost::system::error_code& error)
118 {
119 if (!error && read_msg_.decode_header())
120 {
121 boost::asio::async_read(socket_,
122 boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
123 boost::bind(&chat_session::handle_read_body, shared_from_this(),
124 boost::asio::placeholders::error));
125 }
126 else
127 {
128 room_.leave(shared_from_this());
129 }
130 }
131
handle_read_body(const boost::system::error_code & error)132 void handle_read_body(const boost::system::error_code& error)
133 {
134 if (!error)
135 {
136 room_.deliver(read_msg_);
137 boost::asio::async_read(socket_,
138 boost::asio::buffer(read_msg_.data(), chat_message::header_length),
139 boost::bind(&chat_session::handle_read_header, shared_from_this(),
140 boost::asio::placeholders::error));
141 }
142 else
143 {
144 room_.leave(shared_from_this());
145 }
146 }
147
handle_write(const boost::system::error_code & error)148 void handle_write(const boost::system::error_code& error)
149 {
150 if (!error)
151 {
152 write_msgs_.pop_front();
153 if (!write_msgs_.empty())
154 {
155 boost::asio::async_write(socket_,
156 boost::asio::buffer(write_msgs_.front().data(),
157 write_msgs_.front().length()),
158 boost::bind(&chat_session::handle_write, shared_from_this(),
159 boost::asio::placeholders::error));
160 }
161 }
162 else
163 {
164 room_.leave(shared_from_this());
165 }
166 }
167
168 private:
169 tcp::socket socket_;
170 chat_room& room_;
171 chat_message read_msg_;
172 chat_message_queue write_msgs_;
173 };
174
175 typedef boost::shared_ptr<chat_session> chat_session_ptr;
176
177 //----------------------------------------------------------------------
178
179 class chat_server
180 {
181 public:
chat_server(boost::asio::io_context & io_context,const tcp::endpoint & endpoint)182 chat_server(boost::asio::io_context& io_context,
183 const tcp::endpoint& endpoint)
184 : io_context_(io_context),
185 acceptor_(io_context, endpoint)
186 {
187 start_accept();
188 }
189
start_accept()190 void start_accept()
191 {
192 chat_session_ptr new_session(new chat_session(io_context_, room_));
193 acceptor_.async_accept(new_session->socket(),
194 boost::bind(&chat_server::handle_accept, this, new_session,
195 boost::asio::placeholders::error));
196 }
197
handle_accept(chat_session_ptr session,const boost::system::error_code & error)198 void handle_accept(chat_session_ptr session,
199 const boost::system::error_code& error)
200 {
201 if (!error)
202 {
203 session->start();
204 }
205
206 start_accept();
207 }
208
209 private:
210 boost::asio::io_context& io_context_;
211 tcp::acceptor acceptor_;
212 chat_room room_;
213 };
214
215 typedef boost::shared_ptr<chat_server> chat_server_ptr;
216 typedef std::list<chat_server_ptr> chat_server_list;
217
218 //----------------------------------------------------------------------
219
main(int argc,char * argv[])220 int main(int argc, char* argv[])
221 {
222 try
223 {
224 if (argc < 2)
225 {
226 std::cerr << "Usage: chat_server <port> [<port> ...]\n";
227 return 1;
228 }
229
230 boost::asio::io_context io_context;
231
232 chat_server_list servers;
233 for (int i = 1; i < argc; ++i)
234 {
235 using namespace std; // For atoi.
236 tcp::endpoint endpoint(tcp::v4(), atoi(argv[i]));
237 chat_server_ptr server(new chat_server(io_context, endpoint));
238 servers.push_back(server);
239 }
240
241 io_context.run();
242 }
243 catch (std::exception& e)
244 {
245 std::cerr << "Exception: " << e.what() << "\n";
246 }
247
248 return 0;
249 }
250