1 /*
2 * Copyright 2012 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "examples/peerconnection/client/peer_connection_client.h"
12
13 #include "examples/peerconnection/client/defaults.h"
14 #include "rtc_base/checks.h"
15 #include "rtc_base/logging.h"
16 #include "rtc_base/net_helpers.h"
17
18 #ifdef WIN32
19 #include "rtc_base/win32_socket_server.h"
20 #endif
21
22 namespace {
23
24 // This is our magical hangup signal.
25 const char kByeMessage[] = "BYE";
26 // Delay between server connection retries, in milliseconds
27 const int kReconnectDelay = 2000;
28
CreateClientSocket(int family)29 rtc::AsyncSocket* CreateClientSocket(int family) {
30 #ifdef WIN32
31 rtc::Win32Socket* sock = new rtc::Win32Socket();
32 sock->CreateT(family, SOCK_STREAM);
33 return sock;
34 #elif defined(WEBRTC_POSIX)
35 rtc::Thread* thread = rtc::Thread::Current();
36 RTC_DCHECK(thread != NULL);
37 return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
38 #else
39 #error Platform not supported.
40 #endif
41 }
42
43 } // namespace
44
PeerConnectionClient()45 PeerConnectionClient::PeerConnectionClient()
46 : callback_(NULL), resolver_(NULL), state_(NOT_CONNECTED), my_id_(-1) {}
47
~PeerConnectionClient()48 PeerConnectionClient::~PeerConnectionClient() {}
49
InitSocketSignals()50 void PeerConnectionClient::InitSocketSignals() {
51 RTC_DCHECK(control_socket_.get() != NULL);
52 RTC_DCHECK(hanging_get_.get() != NULL);
53 control_socket_->SignalCloseEvent.connect(this,
54 &PeerConnectionClient::OnClose);
55 hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose);
56 control_socket_->SignalConnectEvent.connect(this,
57 &PeerConnectionClient::OnConnect);
58 hanging_get_->SignalConnectEvent.connect(
59 this, &PeerConnectionClient::OnHangingGetConnect);
60 control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead);
61 hanging_get_->SignalReadEvent.connect(
62 this, &PeerConnectionClient::OnHangingGetRead);
63 }
64
id() const65 int PeerConnectionClient::id() const {
66 return my_id_;
67 }
68
is_connected() const69 bool PeerConnectionClient::is_connected() const {
70 return my_id_ != -1;
71 }
72
peers() const73 const Peers& PeerConnectionClient::peers() const {
74 return peers_;
75 }
76
RegisterObserver(PeerConnectionClientObserver * callback)77 void PeerConnectionClient::RegisterObserver(
78 PeerConnectionClientObserver* callback) {
79 RTC_DCHECK(!callback_);
80 callback_ = callback;
81 }
82
Connect(const std::string & server,int port,const std::string & client_name)83 void PeerConnectionClient::Connect(const std::string& server,
84 int port,
85 const std::string& client_name) {
86 RTC_DCHECK(!server.empty());
87 RTC_DCHECK(!client_name.empty());
88
89 if (state_ != NOT_CONNECTED) {
90 RTC_LOG(WARNING)
91 << "The client must not be connected before you can call Connect()";
92 callback_->OnServerConnectionFailure();
93 return;
94 }
95
96 if (server.empty() || client_name.empty()) {
97 callback_->OnServerConnectionFailure();
98 return;
99 }
100
101 if (port <= 0)
102 port = kDefaultServerPort;
103
104 server_address_.SetIP(server);
105 server_address_.SetPort(port);
106 client_name_ = client_name;
107
108 if (server_address_.IsUnresolvedIP()) {
109 state_ = RESOLVING;
110 resolver_ = new rtc::AsyncResolver();
111 resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
112 resolver_->Start(server_address_);
113 } else {
114 DoConnect();
115 }
116 }
117
OnResolveResult(rtc::AsyncResolverInterface * resolver)118 void PeerConnectionClient::OnResolveResult(
119 rtc::AsyncResolverInterface* resolver) {
120 if (resolver_->GetError() != 0) {
121 callback_->OnServerConnectionFailure();
122 resolver_->Destroy(false);
123 resolver_ = NULL;
124 state_ = NOT_CONNECTED;
125 } else {
126 server_address_ = resolver_->address();
127 DoConnect();
128 }
129 }
130
DoConnect()131 void PeerConnectionClient::DoConnect() {
132 control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
133 hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
134 InitSocketSignals();
135 char buffer[1024];
136 snprintf(buffer, sizeof(buffer), "GET /sign_in?%s HTTP/1.0\r\n\r\n",
137 client_name_.c_str());
138 onconnect_data_ = buffer;
139
140 bool ret = ConnectControlSocket();
141 if (ret)
142 state_ = SIGNING_IN;
143 if (!ret) {
144 callback_->OnServerConnectionFailure();
145 }
146 }
147
SendToPeer(int peer_id,const std::string & message)148 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
149 if (state_ != CONNECTED)
150 return false;
151
152 RTC_DCHECK(is_connected());
153 RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
154 if (!is_connected() || peer_id == -1)
155 return false;
156
157 char headers[1024];
158 snprintf(headers, sizeof(headers),
159 "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
160 "Content-Length: %zu\r\n"
161 "Content-Type: text/plain\r\n"
162 "\r\n",
163 my_id_, peer_id, message.length());
164 onconnect_data_ = headers;
165 onconnect_data_ += message;
166 return ConnectControlSocket();
167 }
168
SendHangUp(int peer_id)169 bool PeerConnectionClient::SendHangUp(int peer_id) {
170 return SendToPeer(peer_id, kByeMessage);
171 }
172
IsSendingMessage()173 bool PeerConnectionClient::IsSendingMessage() {
174 return state_ == CONNECTED &&
175 control_socket_->GetState() != rtc::Socket::CS_CLOSED;
176 }
177
SignOut()178 bool PeerConnectionClient::SignOut() {
179 if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
180 return true;
181
182 if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
183 hanging_get_->Close();
184
185 if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
186 state_ = SIGNING_OUT;
187
188 if (my_id_ != -1) {
189 char buffer[1024];
190 snprintf(buffer, sizeof(buffer),
191 "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
192 onconnect_data_ = buffer;
193 return ConnectControlSocket();
194 } else {
195 // Can occur if the app is closed before we finish connecting.
196 return true;
197 }
198 } else {
199 state_ = SIGNING_OUT_WAITING;
200 }
201
202 return true;
203 }
204
Close()205 void PeerConnectionClient::Close() {
206 control_socket_->Close();
207 hanging_get_->Close();
208 onconnect_data_.clear();
209 peers_.clear();
210 if (resolver_ != NULL) {
211 resolver_->Destroy(false);
212 resolver_ = NULL;
213 }
214 my_id_ = -1;
215 state_ = NOT_CONNECTED;
216 }
217
ConnectControlSocket()218 bool PeerConnectionClient::ConnectControlSocket() {
219 RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
220 int err = control_socket_->Connect(server_address_);
221 if (err == SOCKET_ERROR) {
222 Close();
223 return false;
224 }
225 return true;
226 }
227
OnConnect(rtc::AsyncSocket * socket)228 void PeerConnectionClient::OnConnect(rtc::AsyncSocket* socket) {
229 RTC_DCHECK(!onconnect_data_.empty());
230 size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
231 RTC_DCHECK(sent == onconnect_data_.length());
232 onconnect_data_.clear();
233 }
234
OnHangingGetConnect(rtc::AsyncSocket * socket)235 void PeerConnectionClient::OnHangingGetConnect(rtc::AsyncSocket* socket) {
236 char buffer[1024];
237 snprintf(buffer, sizeof(buffer), "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n",
238 my_id_);
239 int len = static_cast<int>(strlen(buffer));
240 int sent = socket->Send(buffer, len);
241 RTC_DCHECK(sent == len);
242 }
243
OnMessageFromPeer(int peer_id,const std::string & message)244 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
245 const std::string& message) {
246 if (message.length() == (sizeof(kByeMessage) - 1) &&
247 message.compare(kByeMessage) == 0) {
248 callback_->OnPeerDisconnected(peer_id);
249 } else {
250 callback_->OnMessageFromPeer(peer_id, message);
251 }
252 }
253
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,size_t * value)254 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
255 size_t eoh,
256 const char* header_pattern,
257 size_t* value) {
258 RTC_DCHECK(value != NULL);
259 size_t found = data.find(header_pattern);
260 if (found != std::string::npos && found < eoh) {
261 *value = atoi(&data[found + strlen(header_pattern)]);
262 return true;
263 }
264 return false;
265 }
266
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,std::string * value)267 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
268 size_t eoh,
269 const char* header_pattern,
270 std::string* value) {
271 RTC_DCHECK(value != NULL);
272 size_t found = data.find(header_pattern);
273 if (found != std::string::npos && found < eoh) {
274 size_t begin = found + strlen(header_pattern);
275 size_t end = data.find("\r\n", begin);
276 if (end == std::string::npos)
277 end = eoh;
278 value->assign(data.substr(begin, end - begin));
279 return true;
280 }
281 return false;
282 }
283
ReadIntoBuffer(rtc::AsyncSocket * socket,std::string * data,size_t * content_length)284 bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket,
285 std::string* data,
286 size_t* content_length) {
287 char buffer[0xffff];
288 do {
289 int bytes = socket->Recv(buffer, sizeof(buffer), nullptr);
290 if (bytes <= 0)
291 break;
292 data->append(buffer, bytes);
293 } while (true);
294
295 bool ret = false;
296 size_t i = data->find("\r\n\r\n");
297 if (i != std::string::npos) {
298 RTC_LOG(INFO) << "Headers received";
299 if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
300 size_t total_response_size = (i + 4) + *content_length;
301 if (data->length() >= total_response_size) {
302 ret = true;
303 std::string should_close;
304 const char kConnection[] = "\r\nConnection: ";
305 if (GetHeaderValue(*data, i, kConnection, &should_close) &&
306 should_close.compare("close") == 0) {
307 socket->Close();
308 // Since we closed the socket, there was no notification delivered
309 // to us. Compensate by letting ourselves know.
310 OnClose(socket, 0);
311 }
312 } else {
313 // We haven't received everything. Just continue to accept data.
314 }
315 } else {
316 RTC_LOG(LS_ERROR) << "No content length field specified by the server.";
317 }
318 }
319 return ret;
320 }
321
OnRead(rtc::AsyncSocket * socket)322 void PeerConnectionClient::OnRead(rtc::AsyncSocket* socket) {
323 size_t content_length = 0;
324 if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
325 size_t peer_id = 0, eoh = 0;
326 bool ok =
327 ParseServerResponse(control_data_, content_length, &peer_id, &eoh);
328 if (ok) {
329 if (my_id_ == -1) {
330 // First response. Let's store our server assigned ID.
331 RTC_DCHECK(state_ == SIGNING_IN);
332 my_id_ = static_cast<int>(peer_id);
333 RTC_DCHECK(my_id_ != -1);
334
335 // The body of the response will be a list of already connected peers.
336 if (content_length) {
337 size_t pos = eoh + 4;
338 while (pos < control_data_.size()) {
339 size_t eol = control_data_.find('\n', pos);
340 if (eol == std::string::npos)
341 break;
342 int id = 0;
343 std::string name;
344 bool connected;
345 if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
346 &connected) &&
347 id != my_id_) {
348 peers_[id] = name;
349 callback_->OnPeerConnected(id, name);
350 }
351 pos = eol + 1;
352 }
353 }
354 RTC_DCHECK(is_connected());
355 callback_->OnSignedIn();
356 } else if (state_ == SIGNING_OUT) {
357 Close();
358 callback_->OnDisconnected();
359 } else if (state_ == SIGNING_OUT_WAITING) {
360 SignOut();
361 }
362 }
363
364 control_data_.clear();
365
366 if (state_ == SIGNING_IN) {
367 RTC_DCHECK(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
368 state_ = CONNECTED;
369 hanging_get_->Connect(server_address_);
370 }
371 }
372 }
373
OnHangingGetRead(rtc::AsyncSocket * socket)374 void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) {
375 RTC_LOG(INFO) << __FUNCTION__;
376 size_t content_length = 0;
377 if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) {
378 size_t peer_id = 0, eoh = 0;
379 bool ok =
380 ParseServerResponse(notification_data_, content_length, &peer_id, &eoh);
381
382 if (ok) {
383 // Store the position where the body begins.
384 size_t pos = eoh + 4;
385
386 if (my_id_ == static_cast<int>(peer_id)) {
387 // A notification about a new member or a member that just
388 // disconnected.
389 int id = 0;
390 std::string name;
391 bool connected = false;
392 if (ParseEntry(notification_data_.substr(pos), &name, &id,
393 &connected)) {
394 if (connected) {
395 peers_[id] = name;
396 callback_->OnPeerConnected(id, name);
397 } else {
398 peers_.erase(id);
399 callback_->OnPeerDisconnected(id);
400 }
401 }
402 } else {
403 OnMessageFromPeer(static_cast<int>(peer_id),
404 notification_data_.substr(pos));
405 }
406 }
407
408 notification_data_.clear();
409 }
410
411 if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
412 state_ == CONNECTED) {
413 hanging_get_->Connect(server_address_);
414 }
415 }
416
ParseEntry(const std::string & entry,std::string * name,int * id,bool * connected)417 bool PeerConnectionClient::ParseEntry(const std::string& entry,
418 std::string* name,
419 int* id,
420 bool* connected) {
421 RTC_DCHECK(name != NULL);
422 RTC_DCHECK(id != NULL);
423 RTC_DCHECK(connected != NULL);
424 RTC_DCHECK(!entry.empty());
425
426 *connected = false;
427 size_t separator = entry.find(',');
428 if (separator != std::string::npos) {
429 *id = atoi(&entry[separator + 1]);
430 name->assign(entry.substr(0, separator));
431 separator = entry.find(',', separator + 1);
432 if (separator != std::string::npos) {
433 *connected = atoi(&entry[separator + 1]) ? true : false;
434 }
435 }
436 return !name->empty();
437 }
438
GetResponseStatus(const std::string & response)439 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
440 int status = -1;
441 size_t pos = response.find(' ');
442 if (pos != std::string::npos)
443 status = atoi(&response[pos + 1]);
444 return status;
445 }
446
ParseServerResponse(const std::string & response,size_t content_length,size_t * peer_id,size_t * eoh)447 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
448 size_t content_length,
449 size_t* peer_id,
450 size_t* eoh) {
451 int status = GetResponseStatus(response.c_str());
452 if (status != 200) {
453 RTC_LOG(LS_ERROR) << "Received error from server";
454 Close();
455 callback_->OnDisconnected();
456 return false;
457 }
458
459 *eoh = response.find("\r\n\r\n");
460 RTC_DCHECK(*eoh != std::string::npos);
461 if (*eoh == std::string::npos)
462 return false;
463
464 *peer_id = -1;
465
466 // See comment in peer_channel.cc for why we use the Pragma header and
467 // not e.g. "X-Peer-Id".
468 GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
469
470 return true;
471 }
472
OnClose(rtc::AsyncSocket * socket,int err)473 void PeerConnectionClient::OnClose(rtc::AsyncSocket* socket, int err) {
474 RTC_LOG(INFO) << __FUNCTION__;
475
476 socket->Close();
477
478 #ifdef WIN32
479 if (err != WSAECONNREFUSED) {
480 #else
481 if (err != ECONNREFUSED) {
482 #endif
483 if (socket == hanging_get_.get()) {
484 if (state_ == CONNECTED) {
485 hanging_get_->Close();
486 hanging_get_->Connect(server_address_);
487 }
488 } else {
489 callback_->OnMessageSent(err);
490 }
491 } else {
492 if (socket == control_socket_.get()) {
493 RTC_LOG(WARNING) << "Connection refused; retrying in 2 seconds";
494 rtc::Thread::Current()->PostDelayed(RTC_FROM_HERE, kReconnectDelay, this,
495 0);
496 } else {
497 Close();
498 callback_->OnDisconnected();
499 }
500 }
501 }
502
503 void PeerConnectionClient::OnMessage(rtc::Message* msg) {
504 // ignore msg; there is currently only one supported message ("retry")
505 DoConnect();
506 }
507