1 /*
2 * Copyright 2012 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "examples/peerconnection/client/peer_connection_client.h"
12
13 #include "api/units/time_delta.h"
14 #include "examples/peerconnection/client/defaults.h"
15 #include "rtc_base/checks.h"
16 #include "rtc_base/logging.h"
17 #include "rtc_base/net_helpers.h"
18
19 namespace {
20
21 // This is our magical hangup signal.
22 constexpr char kByeMessage[] = "BYE";
23 // Delay between server connection retries, in milliseconds
24 constexpr webrtc::TimeDelta kReconnectDelay = webrtc::TimeDelta::Seconds(2);
25
CreateClientSocket(int family)26 rtc::Socket* CreateClientSocket(int family) {
27 rtc::Thread* thread = rtc::Thread::Current();
28 RTC_DCHECK(thread != NULL);
29 return thread->socketserver()->CreateSocket(family, SOCK_STREAM);
30 }
31
32 } // namespace
33
PeerConnectionClient()34 PeerConnectionClient::PeerConnectionClient()
35 : callback_(NULL), resolver_(NULL), state_(NOT_CONNECTED), my_id_(-1) {}
36
37 PeerConnectionClient::~PeerConnectionClient() = default;
38
InitSocketSignals()39 void PeerConnectionClient::InitSocketSignals() {
40 RTC_DCHECK(control_socket_.get() != NULL);
41 RTC_DCHECK(hanging_get_.get() != NULL);
42 control_socket_->SignalCloseEvent.connect(this,
43 &PeerConnectionClient::OnClose);
44 hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose);
45 control_socket_->SignalConnectEvent.connect(this,
46 &PeerConnectionClient::OnConnect);
47 hanging_get_->SignalConnectEvent.connect(
48 this, &PeerConnectionClient::OnHangingGetConnect);
49 control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead);
50 hanging_get_->SignalReadEvent.connect(
51 this, &PeerConnectionClient::OnHangingGetRead);
52 }
53
id() const54 int PeerConnectionClient::id() const {
55 return my_id_;
56 }
57
is_connected() const58 bool PeerConnectionClient::is_connected() const {
59 return my_id_ != -1;
60 }
61
peers() const62 const Peers& PeerConnectionClient::peers() const {
63 return peers_;
64 }
65
RegisterObserver(PeerConnectionClientObserver * callback)66 void PeerConnectionClient::RegisterObserver(
67 PeerConnectionClientObserver* callback) {
68 RTC_DCHECK(!callback_);
69 callback_ = callback;
70 }
71
Connect(const std::string & server,int port,const std::string & client_name)72 void PeerConnectionClient::Connect(const std::string& server,
73 int port,
74 const std::string& client_name) {
75 RTC_DCHECK(!server.empty());
76 RTC_DCHECK(!client_name.empty());
77
78 if (state_ != NOT_CONNECTED) {
79 RTC_LOG(LS_WARNING)
80 << "The client must not be connected before you can call Connect()";
81 callback_->OnServerConnectionFailure();
82 return;
83 }
84
85 if (server.empty() || client_name.empty()) {
86 callback_->OnServerConnectionFailure();
87 return;
88 }
89
90 if (port <= 0)
91 port = kDefaultServerPort;
92
93 server_address_.SetIP(server);
94 server_address_.SetPort(port);
95 client_name_ = client_name;
96
97 if (server_address_.IsUnresolvedIP()) {
98 state_ = RESOLVING;
99 resolver_ = new rtc::AsyncResolver();
100 resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
101 resolver_->Start(server_address_);
102 } else {
103 DoConnect();
104 }
105 }
106
OnResolveResult(rtc::AsyncResolverInterface * resolver)107 void PeerConnectionClient::OnResolveResult(
108 rtc::AsyncResolverInterface* resolver) {
109 if (resolver_->GetError() != 0) {
110 callback_->OnServerConnectionFailure();
111 resolver_->Destroy(false);
112 resolver_ = NULL;
113 state_ = NOT_CONNECTED;
114 } else {
115 server_address_ = resolver_->address();
116 DoConnect();
117 }
118 }
119
DoConnect()120 void PeerConnectionClient::DoConnect() {
121 control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
122 hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
123 InitSocketSignals();
124 char buffer[1024];
125 snprintf(buffer, sizeof(buffer), "GET /sign_in?%s HTTP/1.0\r\n\r\n",
126 client_name_.c_str());
127 onconnect_data_ = buffer;
128
129 bool ret = ConnectControlSocket();
130 if (ret)
131 state_ = SIGNING_IN;
132 if (!ret) {
133 callback_->OnServerConnectionFailure();
134 }
135 }
136
SendToPeer(int peer_id,const std::string & message)137 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
138 if (state_ != CONNECTED)
139 return false;
140
141 RTC_DCHECK(is_connected());
142 RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
143 if (!is_connected() || peer_id == -1)
144 return false;
145
146 char headers[1024];
147 snprintf(headers, sizeof(headers),
148 "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
149 "Content-Length: %zu\r\n"
150 "Content-Type: text/plain\r\n"
151 "\r\n",
152 my_id_, peer_id, message.length());
153 onconnect_data_ = headers;
154 onconnect_data_ += message;
155 return ConnectControlSocket();
156 }
157
SendHangUp(int peer_id)158 bool PeerConnectionClient::SendHangUp(int peer_id) {
159 return SendToPeer(peer_id, kByeMessage);
160 }
161
IsSendingMessage()162 bool PeerConnectionClient::IsSendingMessage() {
163 return state_ == CONNECTED &&
164 control_socket_->GetState() != rtc::Socket::CS_CLOSED;
165 }
166
SignOut()167 bool PeerConnectionClient::SignOut() {
168 if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
169 return true;
170
171 if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
172 hanging_get_->Close();
173
174 if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
175 state_ = SIGNING_OUT;
176
177 if (my_id_ != -1) {
178 char buffer[1024];
179 snprintf(buffer, sizeof(buffer),
180 "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
181 onconnect_data_ = buffer;
182 return ConnectControlSocket();
183 } else {
184 // Can occur if the app is closed before we finish connecting.
185 return true;
186 }
187 } else {
188 state_ = SIGNING_OUT_WAITING;
189 }
190
191 return true;
192 }
193
Close()194 void PeerConnectionClient::Close() {
195 control_socket_->Close();
196 hanging_get_->Close();
197 onconnect_data_.clear();
198 peers_.clear();
199 if (resolver_ != NULL) {
200 resolver_->Destroy(false);
201 resolver_ = NULL;
202 }
203 my_id_ = -1;
204 state_ = NOT_CONNECTED;
205 }
206
ConnectControlSocket()207 bool PeerConnectionClient::ConnectControlSocket() {
208 RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
209 int err = control_socket_->Connect(server_address_);
210 if (err == SOCKET_ERROR) {
211 Close();
212 return false;
213 }
214 return true;
215 }
216
OnConnect(rtc::Socket * socket)217 void PeerConnectionClient::OnConnect(rtc::Socket* socket) {
218 RTC_DCHECK(!onconnect_data_.empty());
219 size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
220 RTC_DCHECK(sent == onconnect_data_.length());
221 onconnect_data_.clear();
222 }
223
OnHangingGetConnect(rtc::Socket * socket)224 void PeerConnectionClient::OnHangingGetConnect(rtc::Socket* socket) {
225 char buffer[1024];
226 snprintf(buffer, sizeof(buffer), "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n",
227 my_id_);
228 int len = static_cast<int>(strlen(buffer));
229 int sent = socket->Send(buffer, len);
230 RTC_DCHECK(sent == len);
231 }
232
OnMessageFromPeer(int peer_id,const std::string & message)233 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
234 const std::string& message) {
235 if (message.length() == (sizeof(kByeMessage) - 1) &&
236 message.compare(kByeMessage) == 0) {
237 callback_->OnPeerDisconnected(peer_id);
238 } else {
239 callback_->OnMessageFromPeer(peer_id, message);
240 }
241 }
242
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,size_t * value)243 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
244 size_t eoh,
245 const char* header_pattern,
246 size_t* value) {
247 RTC_DCHECK(value != NULL);
248 size_t found = data.find(header_pattern);
249 if (found != std::string::npos && found < eoh) {
250 *value = atoi(&data[found + strlen(header_pattern)]);
251 return true;
252 }
253 return false;
254 }
255
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,std::string * value)256 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
257 size_t eoh,
258 const char* header_pattern,
259 std::string* value) {
260 RTC_DCHECK(value != NULL);
261 size_t found = data.find(header_pattern);
262 if (found != std::string::npos && found < eoh) {
263 size_t begin = found + strlen(header_pattern);
264 size_t end = data.find("\r\n", begin);
265 if (end == std::string::npos)
266 end = eoh;
267 value->assign(data.substr(begin, end - begin));
268 return true;
269 }
270 return false;
271 }
272
ReadIntoBuffer(rtc::Socket * socket,std::string * data,size_t * content_length)273 bool PeerConnectionClient::ReadIntoBuffer(rtc::Socket* socket,
274 std::string* data,
275 size_t* content_length) {
276 char buffer[0xffff];
277 do {
278 int bytes = socket->Recv(buffer, sizeof(buffer), nullptr);
279 if (bytes <= 0)
280 break;
281 data->append(buffer, bytes);
282 } while (true);
283
284 bool ret = false;
285 size_t i = data->find("\r\n\r\n");
286 if (i != std::string::npos) {
287 RTC_LOG(LS_INFO) << "Headers received";
288 if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
289 size_t total_response_size = (i + 4) + *content_length;
290 if (data->length() >= total_response_size) {
291 ret = true;
292 std::string should_close;
293 const char kConnection[] = "\r\nConnection: ";
294 if (GetHeaderValue(*data, i, kConnection, &should_close) &&
295 should_close.compare("close") == 0) {
296 socket->Close();
297 // Since we closed the socket, there was no notification delivered
298 // to us. Compensate by letting ourselves know.
299 OnClose(socket, 0);
300 }
301 } else {
302 // We haven't received everything. Just continue to accept data.
303 }
304 } else {
305 RTC_LOG(LS_ERROR) << "No content length field specified by the server.";
306 }
307 }
308 return ret;
309 }
310
OnRead(rtc::Socket * socket)311 void PeerConnectionClient::OnRead(rtc::Socket* socket) {
312 size_t content_length = 0;
313 if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
314 size_t peer_id = 0, eoh = 0;
315 bool ok =
316 ParseServerResponse(control_data_, content_length, &peer_id, &eoh);
317 if (ok) {
318 if (my_id_ == -1) {
319 // First response. Let's store our server assigned ID.
320 RTC_DCHECK(state_ == SIGNING_IN);
321 my_id_ = static_cast<int>(peer_id);
322 RTC_DCHECK(my_id_ != -1);
323
324 // The body of the response will be a list of already connected peers.
325 if (content_length) {
326 size_t pos = eoh + 4;
327 while (pos < control_data_.size()) {
328 size_t eol = control_data_.find('\n', pos);
329 if (eol == std::string::npos)
330 break;
331 int id = 0;
332 std::string name;
333 bool connected;
334 if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
335 &connected) &&
336 id != my_id_) {
337 peers_[id] = name;
338 callback_->OnPeerConnected(id, name);
339 }
340 pos = eol + 1;
341 }
342 }
343 RTC_DCHECK(is_connected());
344 callback_->OnSignedIn();
345 } else if (state_ == SIGNING_OUT) {
346 Close();
347 callback_->OnDisconnected();
348 } else if (state_ == SIGNING_OUT_WAITING) {
349 SignOut();
350 }
351 }
352
353 control_data_.clear();
354
355 if (state_ == SIGNING_IN) {
356 RTC_DCHECK(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
357 state_ = CONNECTED;
358 hanging_get_->Connect(server_address_);
359 }
360 }
361 }
362
OnHangingGetRead(rtc::Socket * socket)363 void PeerConnectionClient::OnHangingGetRead(rtc::Socket* socket) {
364 RTC_LOG(LS_INFO) << __FUNCTION__;
365 size_t content_length = 0;
366 if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) {
367 size_t peer_id = 0, eoh = 0;
368 bool ok =
369 ParseServerResponse(notification_data_, content_length, &peer_id, &eoh);
370
371 if (ok) {
372 // Store the position where the body begins.
373 size_t pos = eoh + 4;
374
375 if (my_id_ == static_cast<int>(peer_id)) {
376 // A notification about a new member or a member that just
377 // disconnected.
378 int id = 0;
379 std::string name;
380 bool connected = false;
381 if (ParseEntry(notification_data_.substr(pos), &name, &id,
382 &connected)) {
383 if (connected) {
384 peers_[id] = name;
385 callback_->OnPeerConnected(id, name);
386 } else {
387 peers_.erase(id);
388 callback_->OnPeerDisconnected(id);
389 }
390 }
391 } else {
392 OnMessageFromPeer(static_cast<int>(peer_id),
393 notification_data_.substr(pos));
394 }
395 }
396
397 notification_data_.clear();
398 }
399
400 if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
401 state_ == CONNECTED) {
402 hanging_get_->Connect(server_address_);
403 }
404 }
405
ParseEntry(const std::string & entry,std::string * name,int * id,bool * connected)406 bool PeerConnectionClient::ParseEntry(const std::string& entry,
407 std::string* name,
408 int* id,
409 bool* connected) {
410 RTC_DCHECK(name != NULL);
411 RTC_DCHECK(id != NULL);
412 RTC_DCHECK(connected != NULL);
413 RTC_DCHECK(!entry.empty());
414
415 *connected = false;
416 size_t separator = entry.find(',');
417 if (separator != std::string::npos) {
418 *id = atoi(&entry[separator + 1]);
419 name->assign(entry.substr(0, separator));
420 separator = entry.find(',', separator + 1);
421 if (separator != std::string::npos) {
422 *connected = atoi(&entry[separator + 1]) ? true : false;
423 }
424 }
425 return !name->empty();
426 }
427
GetResponseStatus(const std::string & response)428 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
429 int status = -1;
430 size_t pos = response.find(' ');
431 if (pos != std::string::npos)
432 status = atoi(&response[pos + 1]);
433 return status;
434 }
435
ParseServerResponse(const std::string & response,size_t content_length,size_t * peer_id,size_t * eoh)436 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
437 size_t content_length,
438 size_t* peer_id,
439 size_t* eoh) {
440 int status = GetResponseStatus(response.c_str());
441 if (status != 200) {
442 RTC_LOG(LS_ERROR) << "Received error from server";
443 Close();
444 callback_->OnDisconnected();
445 return false;
446 }
447
448 *eoh = response.find("\r\n\r\n");
449 RTC_DCHECK(*eoh != std::string::npos);
450 if (*eoh == std::string::npos)
451 return false;
452
453 *peer_id = -1;
454
455 // See comment in peer_channel.cc for why we use the Pragma header.
456 GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
457
458 return true;
459 }
460
OnClose(rtc::Socket * socket,int err)461 void PeerConnectionClient::OnClose(rtc::Socket* socket, int err) {
462 RTC_LOG(LS_INFO) << __FUNCTION__;
463
464 socket->Close();
465
466 #ifdef WIN32
467 if (err != WSAECONNREFUSED) {
468 #else
469 if (err != ECONNREFUSED) {
470 #endif
471 if (socket == hanging_get_.get()) {
472 if (state_ == CONNECTED) {
473 hanging_get_->Close();
474 hanging_get_->Connect(server_address_);
475 }
476 } else {
477 callback_->OnMessageSent(err);
478 }
479 } else {
480 if (socket == control_socket_.get()) {
481 RTC_LOG(LS_WARNING) << "Connection refused; retrying in 2 seconds";
482 rtc::Thread::Current()->PostDelayedTask(
483 SafeTask(safety_.flag(), [this] { DoConnect(); }), kReconnectDelay);
484 } else {
485 Close();
486 callback_->OnDisconnected();
487 }
488 }
489 }
490