• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2012, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "talk/examples/peerconnection/client/peer_connection_client.h"
29 
30 #include "talk/examples/peerconnection/client/defaults.h"
31 #include "talk/base/common.h"
32 #include "talk/base/nethelpers.h"
33 #include "talk/base/logging.h"
34 #include "talk/base/stringutils.h"
35 
36 #ifdef WIN32
37 #include "talk/base/win32socketserver.h"
38 #endif
39 
40 using talk_base::sprintfn;
41 
42 namespace {
43 
44 // This is our magical hangup signal.
45 const char kByeMessage[] = "BYE";
46 // Delay between server connection retries, in milliseconds
47 const int kReconnectDelay = 2000;
48 
CreateClientSocket(int family)49 talk_base::AsyncSocket* CreateClientSocket(int family) {
50 #ifdef WIN32
51   talk_base::Win32Socket* sock = new talk_base::Win32Socket();
52   sock->CreateT(family, SOCK_STREAM);
53   return sock;
54 #elif defined(POSIX)
55   talk_base::Thread* thread = talk_base::Thread::Current();
56   ASSERT(thread != NULL);
57   return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
58 #else
59 #error Platform not supported.
60 #endif
61 }
62 
63 }
64 
PeerConnectionClient()65 PeerConnectionClient::PeerConnectionClient()
66   : callback_(NULL),
67     resolver_(NULL),
68     state_(NOT_CONNECTED),
69     my_id_(-1) {
70 }
71 
~PeerConnectionClient()72 PeerConnectionClient::~PeerConnectionClient() {
73 }
74 
InitSocketSignals()75 void PeerConnectionClient::InitSocketSignals() {
76   ASSERT(control_socket_.get() != NULL);
77   ASSERT(hanging_get_.get() != NULL);
78   control_socket_->SignalCloseEvent.connect(this,
79       &PeerConnectionClient::OnClose);
80   hanging_get_->SignalCloseEvent.connect(this,
81       &PeerConnectionClient::OnClose);
82   control_socket_->SignalConnectEvent.connect(this,
83       &PeerConnectionClient::OnConnect);
84   hanging_get_->SignalConnectEvent.connect(this,
85       &PeerConnectionClient::OnHangingGetConnect);
86   control_socket_->SignalReadEvent.connect(this,
87       &PeerConnectionClient::OnRead);
88   hanging_get_->SignalReadEvent.connect(this,
89       &PeerConnectionClient::OnHangingGetRead);
90 }
91 
id() const92 int PeerConnectionClient::id() const {
93   return my_id_;
94 }
95 
is_connected() const96 bool PeerConnectionClient::is_connected() const {
97   return my_id_ != -1;
98 }
99 
peers() const100 const Peers& PeerConnectionClient::peers() const {
101   return peers_;
102 }
103 
RegisterObserver(PeerConnectionClientObserver * callback)104 void PeerConnectionClient::RegisterObserver(
105     PeerConnectionClientObserver* callback) {
106   ASSERT(!callback_);
107   callback_ = callback;
108 }
109 
Connect(const std::string & server,int port,const std::string & client_name)110 void PeerConnectionClient::Connect(const std::string& server, int port,
111                                    const std::string& client_name) {
112   ASSERT(!server.empty());
113   ASSERT(!client_name.empty());
114 
115   if (state_ != NOT_CONNECTED) {
116     LOG(WARNING)
117         << "The client must not be connected before you can call Connect()";
118     callback_->OnServerConnectionFailure();
119     return;
120   }
121 
122   if (server.empty() || client_name.empty()) {
123     callback_->OnServerConnectionFailure();
124     return;
125   }
126 
127   if (port <= 0)
128     port = kDefaultServerPort;
129 
130   server_address_.SetIP(server);
131   server_address_.SetPort(port);
132   client_name_ = client_name;
133 
134   if (server_address_.IsUnresolved()) {
135     state_ = RESOLVING;
136     resolver_ = new talk_base::AsyncResolver();
137     resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
138     resolver_->Start(server_address_);
139   } else {
140     DoConnect();
141   }
142 }
143 
OnResolveResult(talk_base::AsyncResolverInterface * resolver)144 void PeerConnectionClient::OnResolveResult(
145     talk_base::AsyncResolverInterface* resolver) {
146   if (resolver_->GetError() != 0) {
147     callback_->OnServerConnectionFailure();
148     resolver_->Destroy(false);
149     resolver_ = NULL;
150     state_ = NOT_CONNECTED;
151   } else {
152     server_address_ = resolver_->address();
153     DoConnect();
154   }
155 }
156 
DoConnect()157 void PeerConnectionClient::DoConnect() {
158   control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
159   hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
160   InitSocketSignals();
161   char buffer[1024];
162   sprintfn(buffer, sizeof(buffer),
163            "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
164   onconnect_data_ = buffer;
165 
166   bool ret = ConnectControlSocket();
167   if (ret)
168     state_ = SIGNING_IN;
169   if (!ret) {
170     callback_->OnServerConnectionFailure();
171   }
172 }
173 
SendToPeer(int peer_id,const std::string & message)174 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
175   if (state_ != CONNECTED)
176     return false;
177 
178   ASSERT(is_connected());
179   ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
180   if (!is_connected() || peer_id == -1)
181     return false;
182 
183   char headers[1024];
184   sprintfn(headers, sizeof(headers),
185       "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
186       "Content-Length: %i\r\n"
187       "Content-Type: text/plain\r\n"
188       "\r\n",
189       my_id_, peer_id, message.length());
190   onconnect_data_ = headers;
191   onconnect_data_ += message;
192   return ConnectControlSocket();
193 }
194 
SendHangUp(int peer_id)195 bool PeerConnectionClient::SendHangUp(int peer_id) {
196   return SendToPeer(peer_id, kByeMessage);
197 }
198 
IsSendingMessage()199 bool PeerConnectionClient::IsSendingMessage() {
200   return state_ == CONNECTED &&
201          control_socket_->GetState() != talk_base::Socket::CS_CLOSED;
202 }
203 
SignOut()204 bool PeerConnectionClient::SignOut() {
205   if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
206     return true;
207 
208   if (hanging_get_->GetState() != talk_base::Socket::CS_CLOSED)
209     hanging_get_->Close();
210 
211   if (control_socket_->GetState() == talk_base::Socket::CS_CLOSED) {
212     state_ = SIGNING_OUT;
213 
214     if (my_id_ != -1) {
215       char buffer[1024];
216       sprintfn(buffer, sizeof(buffer),
217           "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
218       onconnect_data_ = buffer;
219       return ConnectControlSocket();
220     } else {
221       // Can occur if the app is closed before we finish connecting.
222       return true;
223     }
224   } else {
225     state_ = SIGNING_OUT_WAITING;
226   }
227 
228   return true;
229 }
230 
Close()231 void PeerConnectionClient::Close() {
232   control_socket_->Close();
233   hanging_get_->Close();
234   onconnect_data_.clear();
235   peers_.clear();
236   if (resolver_ != NULL) {
237     resolver_->Destroy(false);
238     resolver_ = NULL;
239   }
240   my_id_ = -1;
241   state_ = NOT_CONNECTED;
242 }
243 
ConnectControlSocket()244 bool PeerConnectionClient::ConnectControlSocket() {
245   ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
246   int err = control_socket_->Connect(server_address_);
247   if (err == SOCKET_ERROR) {
248     Close();
249     return false;
250   }
251   return true;
252 }
253 
OnConnect(talk_base::AsyncSocket * socket)254 void PeerConnectionClient::OnConnect(talk_base::AsyncSocket* socket) {
255   ASSERT(!onconnect_data_.empty());
256   size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
257   ASSERT(sent == onconnect_data_.length());
258   UNUSED(sent);
259   onconnect_data_.clear();
260 }
261 
OnHangingGetConnect(talk_base::AsyncSocket * socket)262 void PeerConnectionClient::OnHangingGetConnect(talk_base::AsyncSocket* socket) {
263   char buffer[1024];
264   sprintfn(buffer, sizeof(buffer),
265            "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
266   int len = static_cast<int>(strlen(buffer));
267   int sent = socket->Send(buffer, len);
268   ASSERT(sent == len);
269   UNUSED2(sent, len);
270 }
271 
OnMessageFromPeer(int peer_id,const std::string & message)272 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
273                                              const std::string& message) {
274   if (message.length() == (sizeof(kByeMessage) - 1) &&
275       message.compare(kByeMessage) == 0) {
276     callback_->OnPeerDisconnected(peer_id);
277   } else {
278     callback_->OnMessageFromPeer(peer_id, message);
279   }
280 }
281 
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,size_t * value)282 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
283                                           size_t eoh,
284                                           const char* header_pattern,
285                                           size_t* value) {
286   ASSERT(value != NULL);
287   size_t found = data.find(header_pattern);
288   if (found != std::string::npos && found < eoh) {
289     *value = atoi(&data[found + strlen(header_pattern)]);
290     return true;
291   }
292   return false;
293 }
294 
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,std::string * value)295 bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
296                                           const char* header_pattern,
297                                           std::string* value) {
298   ASSERT(value != NULL);
299   size_t found = data.find(header_pattern);
300   if (found != std::string::npos && found < eoh) {
301     size_t begin = found + strlen(header_pattern);
302     size_t end = data.find("\r\n", begin);
303     if (end == std::string::npos)
304       end = eoh;
305     value->assign(data.substr(begin, end - begin));
306     return true;
307   }
308   return false;
309 }
310 
ReadIntoBuffer(talk_base::AsyncSocket * socket,std::string * data,size_t * content_length)311 bool PeerConnectionClient::ReadIntoBuffer(talk_base::AsyncSocket* socket,
312                                           std::string* data,
313                                           size_t* content_length) {
314   char buffer[0xffff];
315   do {
316     int bytes = socket->Recv(buffer, sizeof(buffer));
317     if (bytes <= 0)
318       break;
319     data->append(buffer, bytes);
320   } while (true);
321 
322   bool ret = false;
323   size_t i = data->find("\r\n\r\n");
324   if (i != std::string::npos) {
325     LOG(INFO) << "Headers received";
326     if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
327       size_t total_response_size = (i + 4) + *content_length;
328       if (data->length() >= total_response_size) {
329         ret = true;
330         std::string should_close;
331         const char kConnection[] = "\r\nConnection: ";
332         if (GetHeaderValue(*data, i, kConnection, &should_close) &&
333             should_close.compare("close") == 0) {
334           socket->Close();
335           // Since we closed the socket, there was no notification delivered
336           // to us.  Compensate by letting ourselves know.
337           OnClose(socket, 0);
338         }
339       } else {
340         // We haven't received everything.  Just continue to accept data.
341       }
342     } else {
343       LOG(LS_ERROR) << "No content length field specified by the server.";
344     }
345   }
346   return ret;
347 }
348 
OnRead(talk_base::AsyncSocket * socket)349 void PeerConnectionClient::OnRead(talk_base::AsyncSocket* socket) {
350   size_t content_length = 0;
351   if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
352     size_t peer_id = 0, eoh = 0;
353     bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
354                                   &eoh);
355     if (ok) {
356       if (my_id_ == -1) {
357         // First response.  Let's store our server assigned ID.
358         ASSERT(state_ == SIGNING_IN);
359         my_id_ = static_cast<int>(peer_id);
360         ASSERT(my_id_ != -1);
361 
362         // The body of the response will be a list of already connected peers.
363         if (content_length) {
364           size_t pos = eoh + 4;
365           while (pos < control_data_.size()) {
366             size_t eol = control_data_.find('\n', pos);
367             if (eol == std::string::npos)
368               break;
369             int id = 0;
370             std::string name;
371             bool connected;
372             if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
373                            &connected) && id != my_id_) {
374               peers_[id] = name;
375               callback_->OnPeerConnected(id, name);
376             }
377             pos = eol + 1;
378           }
379         }
380         ASSERT(is_connected());
381         callback_->OnSignedIn();
382       } else if (state_ == SIGNING_OUT) {
383         Close();
384         callback_->OnDisconnected();
385       } else if (state_ == SIGNING_OUT_WAITING) {
386         SignOut();
387       }
388     }
389 
390     control_data_.clear();
391 
392     if (state_ == SIGNING_IN) {
393       ASSERT(hanging_get_->GetState() == talk_base::Socket::CS_CLOSED);
394       state_ = CONNECTED;
395       hanging_get_->Connect(server_address_);
396     }
397   }
398 }
399 
OnHangingGetRead(talk_base::AsyncSocket * socket)400 void PeerConnectionClient::OnHangingGetRead(talk_base::AsyncSocket* socket) {
401   LOG(INFO) << __FUNCTION__;
402   size_t content_length = 0;
403   if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
404     size_t peer_id = 0, eoh = 0;
405     bool ok = ParseServerResponse(notification_data_, content_length,
406                                   &peer_id, &eoh);
407 
408     if (ok) {
409       // Store the position where the body begins.
410       size_t pos = eoh + 4;
411 
412       if (my_id_ == static_cast<int>(peer_id)) {
413         // A notification about a new member or a member that just
414         // disconnected.
415         int id = 0;
416         std::string name;
417         bool connected = false;
418         if (ParseEntry(notification_data_.substr(pos), &name, &id,
419                        &connected)) {
420           if (connected) {
421             peers_[id] = name;
422             callback_->OnPeerConnected(id, name);
423           } else {
424             peers_.erase(id);
425             callback_->OnPeerDisconnected(id);
426           }
427         }
428       } else {
429         OnMessageFromPeer(static_cast<int>(peer_id),
430                           notification_data_.substr(pos));
431       }
432     }
433 
434     notification_data_.clear();
435   }
436 
437   if (hanging_get_->GetState() == talk_base::Socket::CS_CLOSED &&
438       state_ == CONNECTED) {
439     hanging_get_->Connect(server_address_);
440   }
441 }
442 
ParseEntry(const std::string & entry,std::string * name,int * id,bool * connected)443 bool PeerConnectionClient::ParseEntry(const std::string& entry,
444                                       std::string* name,
445                                       int* id,
446                                       bool* connected) {
447   ASSERT(name != NULL);
448   ASSERT(id != NULL);
449   ASSERT(connected != NULL);
450   ASSERT(!entry.empty());
451 
452   *connected = false;
453   size_t separator = entry.find(',');
454   if (separator != std::string::npos) {
455     *id = atoi(&entry[separator + 1]);
456     name->assign(entry.substr(0, separator));
457     separator = entry.find(',', separator + 1);
458     if (separator != std::string::npos) {
459       *connected = atoi(&entry[separator + 1]) ? true : false;
460     }
461   }
462   return !name->empty();
463 }
464 
GetResponseStatus(const std::string & response)465 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
466   int status = -1;
467   size_t pos = response.find(' ');
468   if (pos != std::string::npos)
469     status = atoi(&response[pos + 1]);
470   return status;
471 }
472 
ParseServerResponse(const std::string & response,size_t content_length,size_t * peer_id,size_t * eoh)473 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
474                                                size_t content_length,
475                                                size_t* peer_id,
476                                                size_t* eoh) {
477   int status = GetResponseStatus(response.c_str());
478   if (status != 200) {
479     LOG(LS_ERROR) << "Received error from server";
480     Close();
481     callback_->OnDisconnected();
482     return false;
483   }
484 
485   *eoh = response.find("\r\n\r\n");
486   ASSERT(*eoh != std::string::npos);
487   if (*eoh == std::string::npos)
488     return false;
489 
490   *peer_id = -1;
491 
492   // See comment in peer_channel.cc for why we use the Pragma header and
493   // not e.g. "X-Peer-Id".
494   GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
495 
496   return true;
497 }
498 
OnClose(talk_base::AsyncSocket * socket,int err)499 void PeerConnectionClient::OnClose(talk_base::AsyncSocket* socket, int err) {
500   LOG(INFO) << __FUNCTION__;
501 
502   socket->Close();
503 
504 #ifdef WIN32
505   if (err != WSAECONNREFUSED) {
506 #else
507   if (err != ECONNREFUSED) {
508 #endif
509     if (socket == hanging_get_.get()) {
510       if (state_ == CONNECTED) {
511         hanging_get_->Close();
512         hanging_get_->Connect(server_address_);
513       }
514     } else {
515       callback_->OnMessageSent(err);
516     }
517   } else {
518     if (socket == control_socket_.get()) {
519       LOG(WARNING) << "Connection refused; retrying in 2 seconds";
520       talk_base::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
521     } else {
522       Close();
523       callback_->OnDisconnected();
524     }
525   }
526 }
527 
528 void PeerConnectionClient::OnMessage(talk_base::Message* msg) {
529   // ignore msg; there is currently only one supported message ("retry")
530   DoConnect();
531 }
532