1 /*
2 * Copyright 2012 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/examples/peerconnection/client/peer_connection_client.h"
12
13 #include "webrtc/examples/peerconnection/client/defaults.h"
14 #include "webrtc/base/common.h"
15 #include "webrtc/base/logging.h"
16 #include "webrtc/base/nethelpers.h"
17 #include "webrtc/base/stringutils.h"
18
19 #ifdef WIN32
20 #include "webrtc/base/win32socketserver.h"
21 #endif
22
23 using rtc::sprintfn;
24
25 namespace {
26
27 // This is our magical hangup signal.
28 const char kByeMessage[] = "BYE";
29 // Delay between server connection retries, in milliseconds
30 const int kReconnectDelay = 2000;
31
CreateClientSocket(int family)32 rtc::AsyncSocket* CreateClientSocket(int family) {
33 #ifdef WIN32
34 rtc::Win32Socket* sock = new rtc::Win32Socket();
35 sock->CreateT(family, SOCK_STREAM);
36 return sock;
37 #elif defined(WEBRTC_POSIX)
38 rtc::Thread* thread = rtc::Thread::Current();
39 ASSERT(thread != NULL);
40 return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
41 #else
42 #error Platform not supported.
43 #endif
44 }
45
46 } // namespace
47
PeerConnectionClient()48 PeerConnectionClient::PeerConnectionClient()
49 : callback_(NULL),
50 resolver_(NULL),
51 state_(NOT_CONNECTED),
52 my_id_(-1) {
53 }
54
~PeerConnectionClient()55 PeerConnectionClient::~PeerConnectionClient() {
56 }
57
InitSocketSignals()58 void PeerConnectionClient::InitSocketSignals() {
59 ASSERT(control_socket_.get() != NULL);
60 ASSERT(hanging_get_.get() != NULL);
61 control_socket_->SignalCloseEvent.connect(this,
62 &PeerConnectionClient::OnClose);
63 hanging_get_->SignalCloseEvent.connect(this,
64 &PeerConnectionClient::OnClose);
65 control_socket_->SignalConnectEvent.connect(this,
66 &PeerConnectionClient::OnConnect);
67 hanging_get_->SignalConnectEvent.connect(this,
68 &PeerConnectionClient::OnHangingGetConnect);
69 control_socket_->SignalReadEvent.connect(this,
70 &PeerConnectionClient::OnRead);
71 hanging_get_->SignalReadEvent.connect(this,
72 &PeerConnectionClient::OnHangingGetRead);
73 }
74
id() const75 int PeerConnectionClient::id() const {
76 return my_id_;
77 }
78
is_connected() const79 bool PeerConnectionClient::is_connected() const {
80 return my_id_ != -1;
81 }
82
peers() const83 const Peers& PeerConnectionClient::peers() const {
84 return peers_;
85 }
86
RegisterObserver(PeerConnectionClientObserver * callback)87 void PeerConnectionClient::RegisterObserver(
88 PeerConnectionClientObserver* callback) {
89 ASSERT(!callback_);
90 callback_ = callback;
91 }
92
Connect(const std::string & server,int port,const std::string & client_name)93 void PeerConnectionClient::Connect(const std::string& server, int port,
94 const std::string& client_name) {
95 ASSERT(!server.empty());
96 ASSERT(!client_name.empty());
97
98 if (state_ != NOT_CONNECTED) {
99 LOG(WARNING)
100 << "The client must not be connected before you can call Connect()";
101 callback_->OnServerConnectionFailure();
102 return;
103 }
104
105 if (server.empty() || client_name.empty()) {
106 callback_->OnServerConnectionFailure();
107 return;
108 }
109
110 if (port <= 0)
111 port = kDefaultServerPort;
112
113 server_address_.SetIP(server);
114 server_address_.SetPort(port);
115 client_name_ = client_name;
116
117 if (server_address_.IsUnresolvedIP()) {
118 state_ = RESOLVING;
119 resolver_ = new rtc::AsyncResolver();
120 resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
121 resolver_->Start(server_address_);
122 } else {
123 DoConnect();
124 }
125 }
126
OnResolveResult(rtc::AsyncResolverInterface * resolver)127 void PeerConnectionClient::OnResolveResult(
128 rtc::AsyncResolverInterface* resolver) {
129 if (resolver_->GetError() != 0) {
130 callback_->OnServerConnectionFailure();
131 resolver_->Destroy(false);
132 resolver_ = NULL;
133 state_ = NOT_CONNECTED;
134 } else {
135 server_address_ = resolver_->address();
136 DoConnect();
137 }
138 }
139
DoConnect()140 void PeerConnectionClient::DoConnect() {
141 control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
142 hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
143 InitSocketSignals();
144 char buffer[1024];
145 sprintfn(buffer, sizeof(buffer),
146 "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
147 onconnect_data_ = buffer;
148
149 bool ret = ConnectControlSocket();
150 if (ret)
151 state_ = SIGNING_IN;
152 if (!ret) {
153 callback_->OnServerConnectionFailure();
154 }
155 }
156
SendToPeer(int peer_id,const std::string & message)157 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
158 if (state_ != CONNECTED)
159 return false;
160
161 ASSERT(is_connected());
162 ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
163 if (!is_connected() || peer_id == -1)
164 return false;
165
166 char headers[1024];
167 sprintfn(headers, sizeof(headers),
168 "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
169 "Content-Length: %i\r\n"
170 "Content-Type: text/plain\r\n"
171 "\r\n",
172 my_id_, peer_id, message.length());
173 onconnect_data_ = headers;
174 onconnect_data_ += message;
175 return ConnectControlSocket();
176 }
177
SendHangUp(int peer_id)178 bool PeerConnectionClient::SendHangUp(int peer_id) {
179 return SendToPeer(peer_id, kByeMessage);
180 }
181
IsSendingMessage()182 bool PeerConnectionClient::IsSendingMessage() {
183 return state_ == CONNECTED &&
184 control_socket_->GetState() != rtc::Socket::CS_CLOSED;
185 }
186
SignOut()187 bool PeerConnectionClient::SignOut() {
188 if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
189 return true;
190
191 if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
192 hanging_get_->Close();
193
194 if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
195 state_ = SIGNING_OUT;
196
197 if (my_id_ != -1) {
198 char buffer[1024];
199 sprintfn(buffer, sizeof(buffer),
200 "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
201 onconnect_data_ = buffer;
202 return ConnectControlSocket();
203 } else {
204 // Can occur if the app is closed before we finish connecting.
205 return true;
206 }
207 } else {
208 state_ = SIGNING_OUT_WAITING;
209 }
210
211 return true;
212 }
213
Close()214 void PeerConnectionClient::Close() {
215 control_socket_->Close();
216 hanging_get_->Close();
217 onconnect_data_.clear();
218 peers_.clear();
219 if (resolver_ != NULL) {
220 resolver_->Destroy(false);
221 resolver_ = NULL;
222 }
223 my_id_ = -1;
224 state_ = NOT_CONNECTED;
225 }
226
ConnectControlSocket()227 bool PeerConnectionClient::ConnectControlSocket() {
228 ASSERT(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
229 int err = control_socket_->Connect(server_address_);
230 if (err == SOCKET_ERROR) {
231 Close();
232 return false;
233 }
234 return true;
235 }
236
OnConnect(rtc::AsyncSocket * socket)237 void PeerConnectionClient::OnConnect(rtc::AsyncSocket* socket) {
238 ASSERT(!onconnect_data_.empty());
239 size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
240 ASSERT(sent == onconnect_data_.length());
241 RTC_UNUSED(sent);
242 onconnect_data_.clear();
243 }
244
OnHangingGetConnect(rtc::AsyncSocket * socket)245 void PeerConnectionClient::OnHangingGetConnect(rtc::AsyncSocket* socket) {
246 char buffer[1024];
247 sprintfn(buffer, sizeof(buffer),
248 "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
249 int len = static_cast<int>(strlen(buffer));
250 int sent = socket->Send(buffer, len);
251 ASSERT(sent == len);
252 RTC_UNUSED2(sent, len);
253 }
254
OnMessageFromPeer(int peer_id,const std::string & message)255 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
256 const std::string& message) {
257 if (message.length() == (sizeof(kByeMessage) - 1) &&
258 message.compare(kByeMessage) == 0) {
259 callback_->OnPeerDisconnected(peer_id);
260 } else {
261 callback_->OnMessageFromPeer(peer_id, message);
262 }
263 }
264
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,size_t * value)265 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
266 size_t eoh,
267 const char* header_pattern,
268 size_t* value) {
269 ASSERT(value != NULL);
270 size_t found = data.find(header_pattern);
271 if (found != std::string::npos && found < eoh) {
272 *value = atoi(&data[found + strlen(header_pattern)]);
273 return true;
274 }
275 return false;
276 }
277
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,std::string * value)278 bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
279 const char* header_pattern,
280 std::string* value) {
281 ASSERT(value != NULL);
282 size_t found = data.find(header_pattern);
283 if (found != std::string::npos && found < eoh) {
284 size_t begin = found + strlen(header_pattern);
285 size_t end = data.find("\r\n", begin);
286 if (end == std::string::npos)
287 end = eoh;
288 value->assign(data.substr(begin, end - begin));
289 return true;
290 }
291 return false;
292 }
293
ReadIntoBuffer(rtc::AsyncSocket * socket,std::string * data,size_t * content_length)294 bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket,
295 std::string* data,
296 size_t* content_length) {
297 char buffer[0xffff];
298 do {
299 int bytes = socket->Recv(buffer, sizeof(buffer));
300 if (bytes <= 0)
301 break;
302 data->append(buffer, bytes);
303 } while (true);
304
305 bool ret = false;
306 size_t i = data->find("\r\n\r\n");
307 if (i != std::string::npos) {
308 LOG(INFO) << "Headers received";
309 if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
310 size_t total_response_size = (i + 4) + *content_length;
311 if (data->length() >= total_response_size) {
312 ret = true;
313 std::string should_close;
314 const char kConnection[] = "\r\nConnection: ";
315 if (GetHeaderValue(*data, i, kConnection, &should_close) &&
316 should_close.compare("close") == 0) {
317 socket->Close();
318 // Since we closed the socket, there was no notification delivered
319 // to us. Compensate by letting ourselves know.
320 OnClose(socket, 0);
321 }
322 } else {
323 // We haven't received everything. Just continue to accept data.
324 }
325 } else {
326 LOG(LS_ERROR) << "No content length field specified by the server.";
327 }
328 }
329 return ret;
330 }
331
OnRead(rtc::AsyncSocket * socket)332 void PeerConnectionClient::OnRead(rtc::AsyncSocket* socket) {
333 size_t content_length = 0;
334 if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
335 size_t peer_id = 0, eoh = 0;
336 bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
337 &eoh);
338 if (ok) {
339 if (my_id_ == -1) {
340 // First response. Let's store our server assigned ID.
341 ASSERT(state_ == SIGNING_IN);
342 my_id_ = static_cast<int>(peer_id);
343 ASSERT(my_id_ != -1);
344
345 // The body of the response will be a list of already connected peers.
346 if (content_length) {
347 size_t pos = eoh + 4;
348 while (pos < control_data_.size()) {
349 size_t eol = control_data_.find('\n', pos);
350 if (eol == std::string::npos)
351 break;
352 int id = 0;
353 std::string name;
354 bool connected;
355 if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
356 &connected) && id != my_id_) {
357 peers_[id] = name;
358 callback_->OnPeerConnected(id, name);
359 }
360 pos = eol + 1;
361 }
362 }
363 ASSERT(is_connected());
364 callback_->OnSignedIn();
365 } else if (state_ == SIGNING_OUT) {
366 Close();
367 callback_->OnDisconnected();
368 } else if (state_ == SIGNING_OUT_WAITING) {
369 SignOut();
370 }
371 }
372
373 control_data_.clear();
374
375 if (state_ == SIGNING_IN) {
376 ASSERT(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
377 state_ = CONNECTED;
378 hanging_get_->Connect(server_address_);
379 }
380 }
381 }
382
OnHangingGetRead(rtc::AsyncSocket * socket)383 void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) {
384 LOG(INFO) << __FUNCTION__;
385 size_t content_length = 0;
386 if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) {
387 size_t peer_id = 0, eoh = 0;
388 bool ok = ParseServerResponse(notification_data_, content_length,
389 &peer_id, &eoh);
390
391 if (ok) {
392 // Store the position where the body begins.
393 size_t pos = eoh + 4;
394
395 if (my_id_ == static_cast<int>(peer_id)) {
396 // A notification about a new member or a member that just
397 // disconnected.
398 int id = 0;
399 std::string name;
400 bool connected = false;
401 if (ParseEntry(notification_data_.substr(pos), &name, &id,
402 &connected)) {
403 if (connected) {
404 peers_[id] = name;
405 callback_->OnPeerConnected(id, name);
406 } else {
407 peers_.erase(id);
408 callback_->OnPeerDisconnected(id);
409 }
410 }
411 } else {
412 OnMessageFromPeer(static_cast<int>(peer_id),
413 notification_data_.substr(pos));
414 }
415 }
416
417 notification_data_.clear();
418 }
419
420 if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
421 state_ == CONNECTED) {
422 hanging_get_->Connect(server_address_);
423 }
424 }
425
ParseEntry(const std::string & entry,std::string * name,int * id,bool * connected)426 bool PeerConnectionClient::ParseEntry(const std::string& entry,
427 std::string* name,
428 int* id,
429 bool* connected) {
430 ASSERT(name != NULL);
431 ASSERT(id != NULL);
432 ASSERT(connected != NULL);
433 ASSERT(!entry.empty());
434
435 *connected = false;
436 size_t separator = entry.find(',');
437 if (separator != std::string::npos) {
438 *id = atoi(&entry[separator + 1]);
439 name->assign(entry.substr(0, separator));
440 separator = entry.find(',', separator + 1);
441 if (separator != std::string::npos) {
442 *connected = atoi(&entry[separator + 1]) ? true : false;
443 }
444 }
445 return !name->empty();
446 }
447
GetResponseStatus(const std::string & response)448 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
449 int status = -1;
450 size_t pos = response.find(' ');
451 if (pos != std::string::npos)
452 status = atoi(&response[pos + 1]);
453 return status;
454 }
455
ParseServerResponse(const std::string & response,size_t content_length,size_t * peer_id,size_t * eoh)456 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
457 size_t content_length,
458 size_t* peer_id,
459 size_t* eoh) {
460 int status = GetResponseStatus(response.c_str());
461 if (status != 200) {
462 LOG(LS_ERROR) << "Received error from server";
463 Close();
464 callback_->OnDisconnected();
465 return false;
466 }
467
468 *eoh = response.find("\r\n\r\n");
469 ASSERT(*eoh != std::string::npos);
470 if (*eoh == std::string::npos)
471 return false;
472
473 *peer_id = -1;
474
475 // See comment in peer_channel.cc for why we use the Pragma header and
476 // not e.g. "X-Peer-Id".
477 GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
478
479 return true;
480 }
481
OnClose(rtc::AsyncSocket * socket,int err)482 void PeerConnectionClient::OnClose(rtc::AsyncSocket* socket, int err) {
483 LOG(INFO) << __FUNCTION__;
484
485 socket->Close();
486
487 #ifdef WIN32
488 if (err != WSAECONNREFUSED) {
489 #else
490 if (err != ECONNREFUSED) {
491 #endif
492 if (socket == hanging_get_.get()) {
493 if (state_ == CONNECTED) {
494 hanging_get_->Close();
495 hanging_get_->Connect(server_address_);
496 }
497 } else {
498 callback_->OnMessageSent(err);
499 }
500 } else {
501 if (socket == control_socket_.get()) {
502 LOG(WARNING) << "Connection refused; retrying in 2 seconds";
503 rtc::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
504 } else {
505 Close();
506 callback_->OnDisconnected();
507 }
508 }
509 }
510
511 void PeerConnectionClient::OnMessage(rtc::Message* msg) {
512 // ignore msg; there is currently only one supported message ("retry")
513 DoConnect();
514 }
515