1 /*
2 * libjingle
3 * Copyright 2012, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include "talk/examples/peerconnection/client/peer_connection_client.h"
29
30 #include "talk/examples/peerconnection/client/defaults.h"
31 #include "talk/base/common.h"
32 #include "talk/base/nethelpers.h"
33 #include "talk/base/logging.h"
34 #include "talk/base/stringutils.h"
35
36 #ifdef WIN32
37 #include "talk/base/win32socketserver.h"
38 #endif
39
40 using talk_base::sprintfn;
41
42 namespace {
43
44 // This is our magical hangup signal.
45 const char kByeMessage[] = "BYE";
46 // Delay between server connection retries, in milliseconds
47 const int kReconnectDelay = 2000;
48
CreateClientSocket(int family)49 talk_base::AsyncSocket* CreateClientSocket(int family) {
50 #ifdef WIN32
51 talk_base::Win32Socket* sock = new talk_base::Win32Socket();
52 sock->CreateT(family, SOCK_STREAM);
53 return sock;
54 #elif defined(POSIX)
55 talk_base::Thread* thread = talk_base::Thread::Current();
56 ASSERT(thread != NULL);
57 return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
58 #else
59 #error Platform not supported.
60 #endif
61 }
62
63 }
64
PeerConnectionClient()65 PeerConnectionClient::PeerConnectionClient()
66 : callback_(NULL),
67 resolver_(NULL),
68 state_(NOT_CONNECTED),
69 my_id_(-1) {
70 }
71
~PeerConnectionClient()72 PeerConnectionClient::~PeerConnectionClient() {
73 }
74
InitSocketSignals()75 void PeerConnectionClient::InitSocketSignals() {
76 ASSERT(control_socket_.get() != NULL);
77 ASSERT(hanging_get_.get() != NULL);
78 control_socket_->SignalCloseEvent.connect(this,
79 &PeerConnectionClient::OnClose);
80 hanging_get_->SignalCloseEvent.connect(this,
81 &PeerConnectionClient::OnClose);
82 control_socket_->SignalConnectEvent.connect(this,
83 &PeerConnectionClient::OnConnect);
84 hanging_get_->SignalConnectEvent.connect(this,
85 &PeerConnectionClient::OnHangingGetConnect);
86 control_socket_->SignalReadEvent.connect(this,
87 &PeerConnectionClient::OnRead);
88 hanging_get_->SignalReadEvent.connect(this,
89 &PeerConnectionClient::OnHangingGetRead);
90 }
91
id() const92 int PeerConnectionClient::id() const {
93 return my_id_;
94 }
95
is_connected() const96 bool PeerConnectionClient::is_connected() const {
97 return my_id_ != -1;
98 }
99
peers() const100 const Peers& PeerConnectionClient::peers() const {
101 return peers_;
102 }
103
RegisterObserver(PeerConnectionClientObserver * callback)104 void PeerConnectionClient::RegisterObserver(
105 PeerConnectionClientObserver* callback) {
106 ASSERT(!callback_);
107 callback_ = callback;
108 }
109
Connect(const std::string & server,int port,const std::string & client_name)110 void PeerConnectionClient::Connect(const std::string& server, int port,
111 const std::string& client_name) {
112 ASSERT(!server.empty());
113 ASSERT(!client_name.empty());
114
115 if (state_ != NOT_CONNECTED) {
116 LOG(WARNING)
117 << "The client must not be connected before you can call Connect()";
118 callback_->OnServerConnectionFailure();
119 return;
120 }
121
122 if (server.empty() || client_name.empty()) {
123 callback_->OnServerConnectionFailure();
124 return;
125 }
126
127 if (port <= 0)
128 port = kDefaultServerPort;
129
130 server_address_.SetIP(server);
131 server_address_.SetPort(port);
132 client_name_ = client_name;
133
134 if (server_address_.IsUnresolved()) {
135 state_ = RESOLVING;
136 resolver_ = new talk_base::AsyncResolver();
137 resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
138 resolver_->Start(server_address_);
139 } else {
140 DoConnect();
141 }
142 }
143
OnResolveResult(talk_base::AsyncResolverInterface * resolver)144 void PeerConnectionClient::OnResolveResult(
145 talk_base::AsyncResolverInterface* resolver) {
146 if (resolver_->GetError() != 0) {
147 callback_->OnServerConnectionFailure();
148 resolver_->Destroy(false);
149 resolver_ = NULL;
150 state_ = NOT_CONNECTED;
151 } else {
152 server_address_ = resolver_->address();
153 DoConnect();
154 }
155 }
156
DoConnect()157 void PeerConnectionClient::DoConnect() {
158 control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
159 hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
160 InitSocketSignals();
161 char buffer[1024];
162 sprintfn(buffer, sizeof(buffer),
163 "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
164 onconnect_data_ = buffer;
165
166 bool ret = ConnectControlSocket();
167 if (ret)
168 state_ = SIGNING_IN;
169 if (!ret) {
170 callback_->OnServerConnectionFailure();
171 }
172 }
173
SendToPeer(int peer_id,const std::string & message)174 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
175 if (state_ != CONNECTED)
176 return false;
177
178 ASSERT(is_connected());
179 ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
180 if (!is_connected() || peer_id == -1)
181 return false;
182
183 char headers[1024];
184 sprintfn(headers, sizeof(headers),
185 "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
186 "Content-Length: %i\r\n"
187 "Content-Type: text/plain\r\n"
188 "\r\n",
189 my_id_, peer_id, message.length());
190 onconnect_data_ = headers;
191 onconnect_data_ += message;
192 return ConnectControlSocket();
193 }
194
SendHangUp(int peer_id)195 bool PeerConnectionClient::SendHangUp(int peer_id) {
196 return SendToPeer(peer_id, kByeMessage);
197 }
198
IsSendingMessage()199 bool PeerConnectionClient::IsSendingMessage() {
200 return state_ == CONNECTED &&
201 control_socket_->GetState() != talk_base::Socket::CS_CLOSED;
202 }
203
SignOut()204 bool PeerConnectionClient::SignOut() {
205 if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
206 return true;
207
208 if (hanging_get_->GetState() != talk_base::Socket::CS_CLOSED)
209 hanging_get_->Close();
210
211 if (control_socket_->GetState() == talk_base::Socket::CS_CLOSED) {
212 state_ = SIGNING_OUT;
213
214 if (my_id_ != -1) {
215 char buffer[1024];
216 sprintfn(buffer, sizeof(buffer),
217 "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
218 onconnect_data_ = buffer;
219 return ConnectControlSocket();
220 } else {
221 // Can occur if the app is closed before we finish connecting.
222 return true;
223 }
224 } else {
225 state_ = SIGNING_OUT_WAITING;
226 }
227
228 return true;
229 }
230
Close()231 void PeerConnectionClient::Close() {
232 control_socket_->Close();
233 hanging_get_->Close();
234 onconnect_data_.clear();
235 peers_.clear();
236 if (resolver_ != NULL) {
237 resolver_->Destroy(false);
238 resolver_ = NULL;
239 }
240 my_id_ = -1;
241 state_ = NOT_CONNECTED;
242 }
243
ConnectControlSocket()244 bool PeerConnectionClient::ConnectControlSocket() {
245 ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
246 int err = control_socket_->Connect(server_address_);
247 if (err == SOCKET_ERROR) {
248 Close();
249 return false;
250 }
251 return true;
252 }
253
OnConnect(talk_base::AsyncSocket * socket)254 void PeerConnectionClient::OnConnect(talk_base::AsyncSocket* socket) {
255 ASSERT(!onconnect_data_.empty());
256 size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
257 ASSERT(sent == onconnect_data_.length());
258 UNUSED(sent);
259 onconnect_data_.clear();
260 }
261
OnHangingGetConnect(talk_base::AsyncSocket * socket)262 void PeerConnectionClient::OnHangingGetConnect(talk_base::AsyncSocket* socket) {
263 char buffer[1024];
264 sprintfn(buffer, sizeof(buffer),
265 "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
266 int len = static_cast<int>(strlen(buffer));
267 int sent = socket->Send(buffer, len);
268 ASSERT(sent == len);
269 UNUSED2(sent, len);
270 }
271
OnMessageFromPeer(int peer_id,const std::string & message)272 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
273 const std::string& message) {
274 if (message.length() == (sizeof(kByeMessage) - 1) &&
275 message.compare(kByeMessage) == 0) {
276 callback_->OnPeerDisconnected(peer_id);
277 } else {
278 callback_->OnMessageFromPeer(peer_id, message);
279 }
280 }
281
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,size_t * value)282 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
283 size_t eoh,
284 const char* header_pattern,
285 size_t* value) {
286 ASSERT(value != NULL);
287 size_t found = data.find(header_pattern);
288 if (found != std::string::npos && found < eoh) {
289 *value = atoi(&data[found + strlen(header_pattern)]);
290 return true;
291 }
292 return false;
293 }
294
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,std::string * value)295 bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
296 const char* header_pattern,
297 std::string* value) {
298 ASSERT(value != NULL);
299 size_t found = data.find(header_pattern);
300 if (found != std::string::npos && found < eoh) {
301 size_t begin = found + strlen(header_pattern);
302 size_t end = data.find("\r\n", begin);
303 if (end == std::string::npos)
304 end = eoh;
305 value->assign(data.substr(begin, end - begin));
306 return true;
307 }
308 return false;
309 }
310
ReadIntoBuffer(talk_base::AsyncSocket * socket,std::string * data,size_t * content_length)311 bool PeerConnectionClient::ReadIntoBuffer(talk_base::AsyncSocket* socket,
312 std::string* data,
313 size_t* content_length) {
314 char buffer[0xffff];
315 do {
316 int bytes = socket->Recv(buffer, sizeof(buffer));
317 if (bytes <= 0)
318 break;
319 data->append(buffer, bytes);
320 } while (true);
321
322 bool ret = false;
323 size_t i = data->find("\r\n\r\n");
324 if (i != std::string::npos) {
325 LOG(INFO) << "Headers received";
326 if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
327 size_t total_response_size = (i + 4) + *content_length;
328 if (data->length() >= total_response_size) {
329 ret = true;
330 std::string should_close;
331 const char kConnection[] = "\r\nConnection: ";
332 if (GetHeaderValue(*data, i, kConnection, &should_close) &&
333 should_close.compare("close") == 0) {
334 socket->Close();
335 // Since we closed the socket, there was no notification delivered
336 // to us. Compensate by letting ourselves know.
337 OnClose(socket, 0);
338 }
339 } else {
340 // We haven't received everything. Just continue to accept data.
341 }
342 } else {
343 LOG(LS_ERROR) << "No content length field specified by the server.";
344 }
345 }
346 return ret;
347 }
348
OnRead(talk_base::AsyncSocket * socket)349 void PeerConnectionClient::OnRead(talk_base::AsyncSocket* socket) {
350 size_t content_length = 0;
351 if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
352 size_t peer_id = 0, eoh = 0;
353 bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
354 &eoh);
355 if (ok) {
356 if (my_id_ == -1) {
357 // First response. Let's store our server assigned ID.
358 ASSERT(state_ == SIGNING_IN);
359 my_id_ = static_cast<int>(peer_id);
360 ASSERT(my_id_ != -1);
361
362 // The body of the response will be a list of already connected peers.
363 if (content_length) {
364 size_t pos = eoh + 4;
365 while (pos < control_data_.size()) {
366 size_t eol = control_data_.find('\n', pos);
367 if (eol == std::string::npos)
368 break;
369 int id = 0;
370 std::string name;
371 bool connected;
372 if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
373 &connected) && id != my_id_) {
374 peers_[id] = name;
375 callback_->OnPeerConnected(id, name);
376 }
377 pos = eol + 1;
378 }
379 }
380 ASSERT(is_connected());
381 callback_->OnSignedIn();
382 } else if (state_ == SIGNING_OUT) {
383 Close();
384 callback_->OnDisconnected();
385 } else if (state_ == SIGNING_OUT_WAITING) {
386 SignOut();
387 }
388 }
389
390 control_data_.clear();
391
392 if (state_ == SIGNING_IN) {
393 ASSERT(hanging_get_->GetState() == talk_base::Socket::CS_CLOSED);
394 state_ = CONNECTED;
395 hanging_get_->Connect(server_address_);
396 }
397 }
398 }
399
OnHangingGetRead(talk_base::AsyncSocket * socket)400 void PeerConnectionClient::OnHangingGetRead(talk_base::AsyncSocket* socket) {
401 LOG(INFO) << __FUNCTION__;
402 size_t content_length = 0;
403 if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) {
404 size_t peer_id = 0, eoh = 0;
405 bool ok = ParseServerResponse(notification_data_, content_length,
406 &peer_id, &eoh);
407
408 if (ok) {
409 // Store the position where the body begins.
410 size_t pos = eoh + 4;
411
412 if (my_id_ == static_cast<int>(peer_id)) {
413 // A notification about a new member or a member that just
414 // disconnected.
415 int id = 0;
416 std::string name;
417 bool connected = false;
418 if (ParseEntry(notification_data_.substr(pos), &name, &id,
419 &connected)) {
420 if (connected) {
421 peers_[id] = name;
422 callback_->OnPeerConnected(id, name);
423 } else {
424 peers_.erase(id);
425 callback_->OnPeerDisconnected(id);
426 }
427 }
428 } else {
429 OnMessageFromPeer(static_cast<int>(peer_id),
430 notification_data_.substr(pos));
431 }
432 }
433
434 notification_data_.clear();
435 }
436
437 if (hanging_get_->GetState() == talk_base::Socket::CS_CLOSED &&
438 state_ == CONNECTED) {
439 hanging_get_->Connect(server_address_);
440 }
441 }
442
ParseEntry(const std::string & entry,std::string * name,int * id,bool * connected)443 bool PeerConnectionClient::ParseEntry(const std::string& entry,
444 std::string* name,
445 int* id,
446 bool* connected) {
447 ASSERT(name != NULL);
448 ASSERT(id != NULL);
449 ASSERT(connected != NULL);
450 ASSERT(!entry.empty());
451
452 *connected = false;
453 size_t separator = entry.find(',');
454 if (separator != std::string::npos) {
455 *id = atoi(&entry[separator + 1]);
456 name->assign(entry.substr(0, separator));
457 separator = entry.find(',', separator + 1);
458 if (separator != std::string::npos) {
459 *connected = atoi(&entry[separator + 1]) ? true : false;
460 }
461 }
462 return !name->empty();
463 }
464
GetResponseStatus(const std::string & response)465 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
466 int status = -1;
467 size_t pos = response.find(' ');
468 if (pos != std::string::npos)
469 status = atoi(&response[pos + 1]);
470 return status;
471 }
472
ParseServerResponse(const std::string & response,size_t content_length,size_t * peer_id,size_t * eoh)473 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
474 size_t content_length,
475 size_t* peer_id,
476 size_t* eoh) {
477 int status = GetResponseStatus(response.c_str());
478 if (status != 200) {
479 LOG(LS_ERROR) << "Received error from server";
480 Close();
481 callback_->OnDisconnected();
482 return false;
483 }
484
485 *eoh = response.find("\r\n\r\n");
486 ASSERT(*eoh != std::string::npos);
487 if (*eoh == std::string::npos)
488 return false;
489
490 *peer_id = -1;
491
492 // See comment in peer_channel.cc for why we use the Pragma header and
493 // not e.g. "X-Peer-Id".
494 GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
495
496 return true;
497 }
498
OnClose(talk_base::AsyncSocket * socket,int err)499 void PeerConnectionClient::OnClose(talk_base::AsyncSocket* socket, int err) {
500 LOG(INFO) << __FUNCTION__;
501
502 socket->Close();
503
504 #ifdef WIN32
505 if (err != WSAECONNREFUSED) {
506 #else
507 if (err != ECONNREFUSED) {
508 #endif
509 if (socket == hanging_get_.get()) {
510 if (state_ == CONNECTED) {
511 hanging_get_->Close();
512 hanging_get_->Connect(server_address_);
513 }
514 } else {
515 callback_->OnMessageSent(err);
516 }
517 } else {
518 if (socket == control_socket_.get()) {
519 LOG(WARNING) << "Connection refused; retrying in 2 seconds";
520 talk_base::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
521 } else {
522 Close();
523 callback_->OnDisconnected();
524 }
525 }
526 }
527
528 void PeerConnectionClient::OnMessage(talk_base::Message* msg) {
529 // ignore msg; there is currently only one supported message ("retry")
530 DoConnect();
531 }
532