• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2012 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "examples/peerconnection/client/peer_connection_client.h"
12 
13 #include "api/units/time_delta.h"
14 #include "examples/peerconnection/client/defaults.h"
15 #include "rtc_base/checks.h"
16 #include "rtc_base/logging.h"
17 #include "rtc_base/net_helpers.h"
18 
19 namespace {
20 
21 // This is our magical hangup signal.
22 constexpr char kByeMessage[] = "BYE";
23 // Delay between server connection retries, in milliseconds
24 constexpr webrtc::TimeDelta kReconnectDelay = webrtc::TimeDelta::Seconds(2);
25 
CreateClientSocket(int family)26 rtc::Socket* CreateClientSocket(int family) {
27   rtc::Thread* thread = rtc::Thread::Current();
28   RTC_DCHECK(thread != NULL);
29   return thread->socketserver()->CreateSocket(family, SOCK_STREAM);
30 }
31 
32 }  // namespace
33 
PeerConnectionClient()34 PeerConnectionClient::PeerConnectionClient()
35     : callback_(NULL), resolver_(NULL), state_(NOT_CONNECTED), my_id_(-1) {}
36 
37 PeerConnectionClient::~PeerConnectionClient() = default;
38 
InitSocketSignals()39 void PeerConnectionClient::InitSocketSignals() {
40   RTC_DCHECK(control_socket_.get() != NULL);
41   RTC_DCHECK(hanging_get_.get() != NULL);
42   control_socket_->SignalCloseEvent.connect(this,
43                                             &PeerConnectionClient::OnClose);
44   hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose);
45   control_socket_->SignalConnectEvent.connect(this,
46                                               &PeerConnectionClient::OnConnect);
47   hanging_get_->SignalConnectEvent.connect(
48       this, &PeerConnectionClient::OnHangingGetConnect);
49   control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead);
50   hanging_get_->SignalReadEvent.connect(
51       this, &PeerConnectionClient::OnHangingGetRead);
52 }
53 
id() const54 int PeerConnectionClient::id() const {
55   return my_id_;
56 }
57 
is_connected() const58 bool PeerConnectionClient::is_connected() const {
59   return my_id_ != -1;
60 }
61 
peers() const62 const Peers& PeerConnectionClient::peers() const {
63   return peers_;
64 }
65 
RegisterObserver(PeerConnectionClientObserver * callback)66 void PeerConnectionClient::RegisterObserver(
67     PeerConnectionClientObserver* callback) {
68   RTC_DCHECK(!callback_);
69   callback_ = callback;
70 }
71 
Connect(const std::string & server,int port,const std::string & client_name)72 void PeerConnectionClient::Connect(const std::string& server,
73                                    int port,
74                                    const std::string& client_name) {
75   RTC_DCHECK(!server.empty());
76   RTC_DCHECK(!client_name.empty());
77 
78   if (state_ != NOT_CONNECTED) {
79     RTC_LOG(LS_WARNING)
80         << "The client must not be connected before you can call Connect()";
81     callback_->OnServerConnectionFailure();
82     return;
83   }
84 
85   if (server.empty() || client_name.empty()) {
86     callback_->OnServerConnectionFailure();
87     return;
88   }
89 
90   if (port <= 0)
91     port = kDefaultServerPort;
92 
93   server_address_.SetIP(server);
94   server_address_.SetPort(port);
95   client_name_ = client_name;
96 
97   if (server_address_.IsUnresolvedIP()) {
98     state_ = RESOLVING;
99     resolver_ = new rtc::AsyncResolver();
100     resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
101     resolver_->Start(server_address_);
102   } else {
103     DoConnect();
104   }
105 }
106 
OnResolveResult(rtc::AsyncResolverInterface * resolver)107 void PeerConnectionClient::OnResolveResult(
108     rtc::AsyncResolverInterface* resolver) {
109   if (resolver_->GetError() != 0) {
110     callback_->OnServerConnectionFailure();
111     resolver_->Destroy(false);
112     resolver_ = NULL;
113     state_ = NOT_CONNECTED;
114   } else {
115     server_address_ = resolver_->address();
116     DoConnect();
117   }
118 }
119 
DoConnect()120 void PeerConnectionClient::DoConnect() {
121   control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
122   hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
123   InitSocketSignals();
124   char buffer[1024];
125   snprintf(buffer, sizeof(buffer), "GET /sign_in?%s HTTP/1.0\r\n\r\n",
126            client_name_.c_str());
127   onconnect_data_ = buffer;
128 
129   bool ret = ConnectControlSocket();
130   if (ret)
131     state_ = SIGNING_IN;
132   if (!ret) {
133     callback_->OnServerConnectionFailure();
134   }
135 }
136 
SendToPeer(int peer_id,const std::string & message)137 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
138   if (state_ != CONNECTED)
139     return false;
140 
141   RTC_DCHECK(is_connected());
142   RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
143   if (!is_connected() || peer_id == -1)
144     return false;
145 
146   char headers[1024];
147   snprintf(headers, sizeof(headers),
148            "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
149            "Content-Length: %zu\r\n"
150            "Content-Type: text/plain\r\n"
151            "\r\n",
152            my_id_, peer_id, message.length());
153   onconnect_data_ = headers;
154   onconnect_data_ += message;
155   return ConnectControlSocket();
156 }
157 
SendHangUp(int peer_id)158 bool PeerConnectionClient::SendHangUp(int peer_id) {
159   return SendToPeer(peer_id, kByeMessage);
160 }
161 
IsSendingMessage()162 bool PeerConnectionClient::IsSendingMessage() {
163   return state_ == CONNECTED &&
164          control_socket_->GetState() != rtc::Socket::CS_CLOSED;
165 }
166 
SignOut()167 bool PeerConnectionClient::SignOut() {
168   if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
169     return true;
170 
171   if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
172     hanging_get_->Close();
173 
174   if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
175     state_ = SIGNING_OUT;
176 
177     if (my_id_ != -1) {
178       char buffer[1024];
179       snprintf(buffer, sizeof(buffer),
180                "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
181       onconnect_data_ = buffer;
182       return ConnectControlSocket();
183     } else {
184       // Can occur if the app is closed before we finish connecting.
185       return true;
186     }
187   } else {
188     state_ = SIGNING_OUT_WAITING;
189   }
190 
191   return true;
192 }
193 
Close()194 void PeerConnectionClient::Close() {
195   control_socket_->Close();
196   hanging_get_->Close();
197   onconnect_data_.clear();
198   peers_.clear();
199   if (resolver_ != NULL) {
200     resolver_->Destroy(false);
201     resolver_ = NULL;
202   }
203   my_id_ = -1;
204   state_ = NOT_CONNECTED;
205 }
206 
ConnectControlSocket()207 bool PeerConnectionClient::ConnectControlSocket() {
208   RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
209   int err = control_socket_->Connect(server_address_);
210   if (err == SOCKET_ERROR) {
211     Close();
212     return false;
213   }
214   return true;
215 }
216 
OnConnect(rtc::Socket * socket)217 void PeerConnectionClient::OnConnect(rtc::Socket* socket) {
218   RTC_DCHECK(!onconnect_data_.empty());
219   size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
220   RTC_DCHECK(sent == onconnect_data_.length());
221   onconnect_data_.clear();
222 }
223 
OnHangingGetConnect(rtc::Socket * socket)224 void PeerConnectionClient::OnHangingGetConnect(rtc::Socket* socket) {
225   char buffer[1024];
226   snprintf(buffer, sizeof(buffer), "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n",
227            my_id_);
228   int len = static_cast<int>(strlen(buffer));
229   int sent = socket->Send(buffer, len);
230   RTC_DCHECK(sent == len);
231 }
232 
OnMessageFromPeer(int peer_id,const std::string & message)233 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
234                                              const std::string& message) {
235   if (message.length() == (sizeof(kByeMessage) - 1) &&
236       message.compare(kByeMessage) == 0) {
237     callback_->OnPeerDisconnected(peer_id);
238   } else {
239     callback_->OnMessageFromPeer(peer_id, message);
240   }
241 }
242 
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,size_t * value)243 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
244                                           size_t eoh,
245                                           const char* header_pattern,
246                                           size_t* value) {
247   RTC_DCHECK(value != NULL);
248   size_t found = data.find(header_pattern);
249   if (found != std::string::npos && found < eoh) {
250     *value = atoi(&data[found + strlen(header_pattern)]);
251     return true;
252   }
253   return false;
254 }
255 
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,std::string * value)256 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
257                                           size_t eoh,
258                                           const char* header_pattern,
259                                           std::string* value) {
260   RTC_DCHECK(value != NULL);
261   size_t found = data.find(header_pattern);
262   if (found != std::string::npos && found < eoh) {
263     size_t begin = found + strlen(header_pattern);
264     size_t end = data.find("\r\n", begin);
265     if (end == std::string::npos)
266       end = eoh;
267     value->assign(data.substr(begin, end - begin));
268     return true;
269   }
270   return false;
271 }
272 
ReadIntoBuffer(rtc::Socket * socket,std::string * data,size_t * content_length)273 bool PeerConnectionClient::ReadIntoBuffer(rtc::Socket* socket,
274                                           std::string* data,
275                                           size_t* content_length) {
276   char buffer[0xffff];
277   do {
278     int bytes = socket->Recv(buffer, sizeof(buffer), nullptr);
279     if (bytes <= 0)
280       break;
281     data->append(buffer, bytes);
282   } while (true);
283 
284   bool ret = false;
285   size_t i = data->find("\r\n\r\n");
286   if (i != std::string::npos) {
287     RTC_LOG(LS_INFO) << "Headers received";
288     if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
289       size_t total_response_size = (i + 4) + *content_length;
290       if (data->length() >= total_response_size) {
291         ret = true;
292         std::string should_close;
293         const char kConnection[] = "\r\nConnection: ";
294         if (GetHeaderValue(*data, i, kConnection, &should_close) &&
295             should_close.compare("close") == 0) {
296           socket->Close();
297           // Since we closed the socket, there was no notification delivered
298           // to us.  Compensate by letting ourselves know.
299           OnClose(socket, 0);
300         }
301       } else {
302         // We haven't received everything.  Just continue to accept data.
303       }
304     } else {
305       RTC_LOG(LS_ERROR) << "No content length field specified by the server.";
306     }
307   }
308   return ret;
309 }
310 
OnRead(rtc::Socket * socket)311 void PeerConnectionClient::OnRead(rtc::Socket* socket) {
312   size_t content_length = 0;
313   if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
314     size_t peer_id = 0, eoh = 0;
315     bool ok =
316         ParseServerResponse(control_data_, content_length, &peer_id, &eoh);
317     if (ok) {
318       if (my_id_ == -1) {
319         // First response.  Let's store our server assigned ID.
320         RTC_DCHECK(state_ == SIGNING_IN);
321         my_id_ = static_cast<int>(peer_id);
322         RTC_DCHECK(my_id_ != -1);
323 
324         // The body of the response will be a list of already connected peers.
325         if (content_length) {
326           size_t pos = eoh + 4;
327           while (pos < control_data_.size()) {
328             size_t eol = control_data_.find('\n', pos);
329             if (eol == std::string::npos)
330               break;
331             int id = 0;
332             std::string name;
333             bool connected;
334             if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
335                            &connected) &&
336                 id != my_id_) {
337               peers_[id] = name;
338               callback_->OnPeerConnected(id, name);
339             }
340             pos = eol + 1;
341           }
342         }
343         RTC_DCHECK(is_connected());
344         callback_->OnSignedIn();
345       } else if (state_ == SIGNING_OUT) {
346         Close();
347         callback_->OnDisconnected();
348       } else if (state_ == SIGNING_OUT_WAITING) {
349         SignOut();
350       }
351     }
352 
353     control_data_.clear();
354 
355     if (state_ == SIGNING_IN) {
356       RTC_DCHECK(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
357       state_ = CONNECTED;
358       hanging_get_->Connect(server_address_);
359     }
360   }
361 }
362 
OnHangingGetRead(rtc::Socket * socket)363 void PeerConnectionClient::OnHangingGetRead(rtc::Socket* socket) {
364   RTC_LOG(LS_INFO) << __FUNCTION__;
365   size_t content_length = 0;
366   if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
367     size_t peer_id = 0, eoh = 0;
368     bool ok =
369         ParseServerResponse(notification_data_, content_length, &peer_id, &eoh);
370 
371     if (ok) {
372       // Store the position where the body begins.
373       size_t pos = eoh + 4;
374 
375       if (my_id_ == static_cast<int>(peer_id)) {
376         // A notification about a new member or a member that just
377         // disconnected.
378         int id = 0;
379         std::string name;
380         bool connected = false;
381         if (ParseEntry(notification_data_.substr(pos), &name, &id,
382                        &connected)) {
383           if (connected) {
384             peers_[id] = name;
385             callback_->OnPeerConnected(id, name);
386           } else {
387             peers_.erase(id);
388             callback_->OnPeerDisconnected(id);
389           }
390         }
391       } else {
392         OnMessageFromPeer(static_cast<int>(peer_id),
393                           notification_data_.substr(pos));
394       }
395     }
396 
397     notification_data_.clear();
398   }
399 
400   if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
401       state_ == CONNECTED) {
402     hanging_get_->Connect(server_address_);
403   }
404 }
405 
ParseEntry(const std::string & entry,std::string * name,int * id,bool * connected)406 bool PeerConnectionClient::ParseEntry(const std::string& entry,
407                                       std::string* name,
408                                       int* id,
409                                       bool* connected) {
410   RTC_DCHECK(name != NULL);
411   RTC_DCHECK(id != NULL);
412   RTC_DCHECK(connected != NULL);
413   RTC_DCHECK(!entry.empty());
414 
415   *connected = false;
416   size_t separator = entry.find(',');
417   if (separator != std::string::npos) {
418     *id = atoi(&entry[separator + 1]);
419     name->assign(entry.substr(0, separator));
420     separator = entry.find(',', separator + 1);
421     if (separator != std::string::npos) {
422       *connected = atoi(&entry[separator + 1]) ? true : false;
423     }
424   }
425   return !name->empty();
426 }
427 
GetResponseStatus(const std::string & response)428 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
429   int status = -1;
430   size_t pos = response.find(' ');
431   if (pos != std::string::npos)
432     status = atoi(&response[pos + 1]);
433   return status;
434 }
435 
ParseServerResponse(const std::string & response,size_t content_length,size_t * peer_id,size_t * eoh)436 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
437                                                size_t content_length,
438                                                size_t* peer_id,
439                                                size_t* eoh) {
440   int status = GetResponseStatus(response.c_str());
441   if (status != 200) {
442     RTC_LOG(LS_ERROR) << "Received error from server";
443     Close();
444     callback_->OnDisconnected();
445     return false;
446   }
447 
448   *eoh = response.find("\r\n\r\n");
449   RTC_DCHECK(*eoh != std::string::npos);
450   if (*eoh == std::string::npos)
451     return false;
452 
453   *peer_id = -1;
454 
455   // See comment in peer_channel.cc for why we use the Pragma header.
456   GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
457 
458   return true;
459 }
460 
OnClose(rtc::Socket * socket,int err)461 void PeerConnectionClient::OnClose(rtc::Socket* socket, int err) {
462   RTC_LOG(LS_INFO) << __FUNCTION__;
463 
464   socket->Close();
465 
466 #ifdef WIN32
467   if (err != WSAECONNREFUSED) {
468 #else
469   if (err != ECONNREFUSED) {
470 #endif
471     if (socket == hanging_get_.get()) {
472       if (state_ == CONNECTED) {
473         hanging_get_->Close();
474         hanging_get_->Connect(server_address_);
475       }
476     } else {
477       callback_->OnMessageSent(err);
478     }
479   } else {
480     if (socket == control_socket_.get()) {
481       RTC_LOG(LS_WARNING) << "Connection refused; retrying in 2 seconds";
482       rtc::Thread::Current()->PostDelayedTask(
483           SafeTask(safety_.flag(), [this] { DoConnect(); }), kReconnectDelay);
484     } else {
485       Close();
486       callback_->OnDisconnected();
487     }
488   }
489 }
490