• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2011 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "examples/peerconnection/server/peer_channel.h"
12 
13 #include <assert.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 
17 #include <algorithm>
18 
19 #include "examples/peerconnection/server/data_socket.h"
20 #include "examples/peerconnection/server/utils.h"
21 
22 // Set to the peer id of the originator when messages are being
23 // exchanged between peers, but set to the id of the receiving peer
24 // itself when notifications are sent from the server about the state
25 // of other peers.
26 //
27 // WORKAROUND: Since support for CORS varies greatly from one browser to the
28 // next, we don't use a custom name for our peer-id header (originally it was
29 // "X-Peer-Id: ").  Instead, we use a "simple header", "Pragma" which should
30 // always be exposed to CORS requests.  There is a special CORS header devoted
31 // to exposing proprietary headers (Access-Control-Expose-Headers), however
32 // at this point it is not working correctly in some popular browsers.
33 static const char kPeerIdHeader[] = "Pragma: ";
34 
35 static const char* kRequestPaths[] = {
36     "/wait",
37     "/sign_out",
38     "/message",
39 };
40 
41 enum RequestPathIndex {
42   kWait,
43   kSignOut,
44   kMessage,
45 };
46 
47 const size_t kMaxNameLength = 512;
48 
49 //
50 // ChannelMember
51 //
52 
53 int ChannelMember::s_member_id_ = 0;
54 
ChannelMember(DataSocket * socket)55 ChannelMember::ChannelMember(DataSocket* socket)
56     : waiting_socket_(NULL),
57       id_(++s_member_id_),
58       connected_(true),
59       timestamp_(time(NULL)) {
60   assert(socket);
61   assert(socket->method() == DataSocket::GET);
62   assert(socket->PathEquals("/sign_in"));
63   name_ = socket->request_arguments();
64   if (name_.empty())
65     name_ = "peer_" + int2str(id_);
66   else if (name_.length() > kMaxNameLength)
67     name_.resize(kMaxNameLength);
68 
69   std::replace(name_.begin(), name_.end(), ',', '_');
70 }
71 
~ChannelMember()72 ChannelMember::~ChannelMember() {}
73 
is_wait_request(DataSocket * ds) const74 bool ChannelMember::is_wait_request(DataSocket* ds) const {
75   return ds && ds->PathEquals(kRequestPaths[kWait]);
76 }
77 
TimedOut()78 bool ChannelMember::TimedOut() {
79   return waiting_socket_ == NULL && (time(NULL) - timestamp_) > 30;
80 }
81 
GetPeerIdHeader() const82 std::string ChannelMember::GetPeerIdHeader() const {
83   std::string ret(kPeerIdHeader + int2str(id_) + "\r\n");
84   return ret;
85 }
86 
NotifyOfOtherMember(const ChannelMember & other)87 bool ChannelMember::NotifyOfOtherMember(const ChannelMember& other) {
88   assert(&other != this);
89   QueueResponse("200 OK", "text/plain", GetPeerIdHeader(), other.GetEntry());
90   return true;
91 }
92 
93 // Returns a string in the form "name,id,connected\n".
GetEntry() const94 std::string ChannelMember::GetEntry() const {
95   assert(name_.length() <= kMaxNameLength);
96 
97   // name, 11-digit int, 1-digit bool, newline, null
98   char entry[kMaxNameLength + 15];
99   snprintf(entry, sizeof(entry), "%s,%d,%d\n",
100            name_.substr(0, kMaxNameLength).c_str(), id_, connected_);
101   return entry;
102 }
103 
ForwardRequestToPeer(DataSocket * ds,ChannelMember * peer)104 void ChannelMember::ForwardRequestToPeer(DataSocket* ds, ChannelMember* peer) {
105   assert(peer);
106   assert(ds);
107 
108   std::string extra_headers(GetPeerIdHeader());
109 
110   if (peer == this) {
111     ds->Send("200 OK", true, ds->content_type(), extra_headers, ds->data());
112   } else {
113     printf("Client %s sending to %s\n", name_.c_str(), peer->name().c_str());
114     peer->QueueResponse("200 OK", ds->content_type(), extra_headers,
115                         ds->data());
116     ds->Send("200 OK", true, "text/plain", "", "");
117   }
118 }
119 
OnClosing(DataSocket * ds)120 void ChannelMember::OnClosing(DataSocket* ds) {
121   if (ds == waiting_socket_) {
122     waiting_socket_ = NULL;
123     timestamp_ = time(NULL);
124   }
125 }
126 
QueueResponse(const std::string & status,const std::string & content_type,const std::string & extra_headers,const std::string & data)127 void ChannelMember::QueueResponse(const std::string& status,
128                                   const std::string& content_type,
129                                   const std::string& extra_headers,
130                                   const std::string& data) {
131   if (waiting_socket_) {
132     assert(queue_.empty());
133     assert(waiting_socket_->method() == DataSocket::GET);
134     bool ok =
135         waiting_socket_->Send(status, true, content_type, extra_headers, data);
136     if (!ok) {
137       printf("Failed to deliver data to waiting socket\n");
138     }
139     waiting_socket_ = NULL;
140     timestamp_ = time(NULL);
141   } else {
142     QueuedResponse qr;
143     qr.status = status;
144     qr.content_type = content_type;
145     qr.extra_headers = extra_headers;
146     qr.data = data;
147     queue_.push(qr);
148   }
149 }
150 
SetWaitingSocket(DataSocket * ds)151 void ChannelMember::SetWaitingSocket(DataSocket* ds) {
152   assert(ds->method() == DataSocket::GET);
153   if (ds && !queue_.empty()) {
154     assert(waiting_socket_ == NULL);
155     const QueuedResponse& response = queue_.front();
156     ds->Send(response.status, true, response.content_type,
157              response.extra_headers, response.data);
158     queue_.pop();
159   } else {
160     waiting_socket_ = ds;
161   }
162 }
163 
164 //
165 // PeerChannel
166 //
167 
168 // static
IsPeerConnection(const DataSocket * ds)169 bool PeerChannel::IsPeerConnection(const DataSocket* ds) {
170   assert(ds);
171   return (ds->method() == DataSocket::POST && ds->content_length() > 0) ||
172          (ds->method() == DataSocket::GET && ds->PathEquals("/sign_in"));
173 }
174 
Lookup(DataSocket * ds) const175 ChannelMember* PeerChannel::Lookup(DataSocket* ds) const {
176   assert(ds);
177 
178   if (ds->method() != DataSocket::GET && ds->method() != DataSocket::POST)
179     return NULL;
180 
181   size_t i = 0;
182   for (; i < ARRAYSIZE(kRequestPaths); ++i) {
183     if (ds->PathEquals(kRequestPaths[i]))
184       break;
185   }
186 
187   if (i == ARRAYSIZE(kRequestPaths))
188     return NULL;
189 
190   std::string args(ds->request_arguments());
191   static const char kPeerId[] = "peer_id=";
192   size_t found = args.find(kPeerId);
193   if (found == std::string::npos)
194     return NULL;
195 
196   int id = atoi(&args[found + ARRAYSIZE(kPeerId) - 1]);
197   Members::const_iterator iter = members_.begin();
198   for (; iter != members_.end(); ++iter) {
199     if (id == (*iter)->id()) {
200       if (i == kWait)
201         (*iter)->SetWaitingSocket(ds);
202       if (i == kSignOut)
203         (*iter)->set_disconnected();
204       return *iter;
205     }
206   }
207 
208   return NULL;
209 }
210 
IsTargetedRequest(const DataSocket * ds) const211 ChannelMember* PeerChannel::IsTargetedRequest(const DataSocket* ds) const {
212   assert(ds);
213   // Regardless of GET or POST, we look for the peer_id parameter
214   // only in the request_path.
215   const std::string& path = ds->request_path();
216   size_t args = path.find('?');
217   if (args == std::string::npos)
218     return NULL;
219   size_t found;
220   const char kTargetPeerIdParam[] = "to=";
221   do {
222     found = path.find(kTargetPeerIdParam, args);
223     if (found == std::string::npos)
224       return NULL;
225     if (found == (args + 1) || path[found - 1] == '&') {
226       found += ARRAYSIZE(kTargetPeerIdParam) - 1;
227       break;
228     }
229     args = found + ARRAYSIZE(kTargetPeerIdParam) - 1;
230   } while (true);
231   int id = atoi(&path[found]);
232   Members::const_iterator i = members_.begin();
233   for (; i != members_.end(); ++i) {
234     if ((*i)->id() == id) {
235       return *i;
236     }
237   }
238   return NULL;
239 }
240 
AddMember(DataSocket * ds)241 bool PeerChannel::AddMember(DataSocket* ds) {
242   assert(IsPeerConnection(ds));
243   ChannelMember* new_guy = new ChannelMember(ds);
244   Members failures;
245   BroadcastChangedState(*new_guy, &failures);
246   HandleDeliveryFailures(&failures);
247   members_.push_back(new_guy);
248 
249   printf("New member added (total=%s): %s\n",
250          size_t2str(members_.size()).c_str(), new_guy->name().c_str());
251 
252   // Let the newly connected peer know about other members of the channel.
253   std::string content_type;
254   std::string response = BuildResponseForNewMember(*new_guy, &content_type);
255   ds->Send("200 Added", true, content_type, new_guy->GetPeerIdHeader(),
256            response);
257   return true;
258 }
259 
CloseAll()260 void PeerChannel::CloseAll() {
261   Members::const_iterator i = members_.begin();
262   for (; i != members_.end(); ++i) {
263     (*i)->QueueResponse("200 OK", "text/plain", "", "Server shutting down");
264   }
265   DeleteAll();
266 }
267 
OnClosing(DataSocket * ds)268 void PeerChannel::OnClosing(DataSocket* ds) {
269   for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
270     ChannelMember* m = (*i);
271     m->OnClosing(ds);
272     if (!m->connected()) {
273       i = members_.erase(i);
274       Members failures;
275       BroadcastChangedState(*m, &failures);
276       HandleDeliveryFailures(&failures);
277       delete m;
278       if (i == members_.end())
279         break;
280     }
281   }
282   printf("Total connected: %s\n", size_t2str(members_.size()).c_str());
283 }
284 
CheckForTimeout()285 void PeerChannel::CheckForTimeout() {
286   for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
287     ChannelMember* m = (*i);
288     if (m->TimedOut()) {
289       printf("Timeout: %s\n", m->name().c_str());
290       m->set_disconnected();
291       i = members_.erase(i);
292       Members failures;
293       BroadcastChangedState(*m, &failures);
294       HandleDeliveryFailures(&failures);
295       delete m;
296       if (i == members_.end())
297         break;
298     }
299   }
300 }
301 
DeleteAll()302 void PeerChannel::DeleteAll() {
303   for (Members::iterator i = members_.begin(); i != members_.end(); ++i)
304     delete (*i);
305   members_.clear();
306 }
307 
BroadcastChangedState(const ChannelMember & member,Members * delivery_failures)308 void PeerChannel::BroadcastChangedState(const ChannelMember& member,
309                                         Members* delivery_failures) {
310   // This function should be called prior to DataSocket::Close().
311   assert(delivery_failures);
312 
313   if (!member.connected()) {
314     printf("Member disconnected: %s\n", member.name().c_str());
315   }
316 
317   Members::iterator i = members_.begin();
318   for (; i != members_.end(); ++i) {
319     if (&member != (*i)) {
320       if (!(*i)->NotifyOfOtherMember(member)) {
321         (*i)->set_disconnected();
322         delivery_failures->push_back(*i);
323         i = members_.erase(i);
324         if (i == members_.end())
325           break;
326       }
327     }
328   }
329 }
330 
HandleDeliveryFailures(Members * failures)331 void PeerChannel::HandleDeliveryFailures(Members* failures) {
332   assert(failures);
333 
334   while (!failures->empty()) {
335     Members::iterator i = failures->begin();
336     ChannelMember* member = *i;
337     assert(!member->connected());
338     failures->erase(i);
339     BroadcastChangedState(*member, failures);
340     delete member;
341   }
342 }
343 
344 // Builds a simple list of "name,id\n" entries for each member.
BuildResponseForNewMember(const ChannelMember & member,std::string * content_type)345 std::string PeerChannel::BuildResponseForNewMember(const ChannelMember& member,
346                                                    std::string* content_type) {
347   assert(content_type);
348 
349   *content_type = "text/plain";
350   // The peer itself will always be the first entry.
351   std::string response(member.GetEntry());
352   for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
353     if (member.id() != (*i)->id()) {
354       assert((*i)->connected());
355       response += (*i)->GetEntry();
356     }
357   }
358 
359   return response;
360 }
361