1 /*
2 * Copyright 2011 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "examples/peerconnection/server/peer_channel.h"
12
13 #include <stdio.h>
14 #include <stdlib.h>
15
16 #include <algorithm>
17
18 #include "examples/peerconnection/server/data_socket.h"
19 #include "examples/peerconnection/server/utils.h"
20 #include "rtc_base/checks.h"
21
22 // Set to the peer id of the originator when messages are being
23 // exchanged between peers, but set to the id of the receiving peer
24 // itself when notifications are sent from the server about the state
25 // of other peers.
26 //
27 // WORKAROUND: Since support for CORS varies greatly from one browser to the
28 // next, we don't use a custom name for our peer-id header (originally it was
29 // "X-Peer-Id: "). Instead, we use a "simple header", "Pragma" which should
30 // always be exposed to CORS requests. There is a special CORS header devoted
31 // to exposing proprietary headers (Access-Control-Expose-Headers), however
32 // at this point it is not working correctly in some popular browsers.
33 static const char kPeerIdHeader[] = "Pragma: ";
34
35 static const char* kRequestPaths[] = {
36 "/wait",
37 "/sign_out",
38 "/message",
39 };
40
41 enum RequestPathIndex {
42 kWait,
43 kSignOut,
44 kMessage,
45 };
46
47 const size_t kMaxNameLength = 512;
48
49 //
50 // ChannelMember
51 //
52
53 int ChannelMember::s_member_id_ = 0;
54
ChannelMember(DataSocket * socket)55 ChannelMember::ChannelMember(DataSocket* socket)
56 : waiting_socket_(NULL),
57 id_(++s_member_id_),
58 connected_(true),
59 timestamp_(time(NULL)) {
60 RTC_DCHECK(socket);
61 RTC_DCHECK_EQ(socket->method(), DataSocket::GET);
62 RTC_DCHECK(socket->PathEquals("/sign_in"));
63 name_ = socket->request_arguments();
64 if (name_.empty())
65 name_ = "peer_" + int2str(id_);
66 else if (name_.length() > kMaxNameLength)
67 name_.resize(kMaxNameLength);
68
69 std::replace(name_.begin(), name_.end(), ',', '_');
70 }
71
~ChannelMember()72 ChannelMember::~ChannelMember() {}
73
is_wait_request(DataSocket * ds) const74 bool ChannelMember::is_wait_request(DataSocket* ds) const {
75 return ds && ds->PathEquals(kRequestPaths[kWait]);
76 }
77
TimedOut()78 bool ChannelMember::TimedOut() {
79 return waiting_socket_ == NULL && (time(NULL) - timestamp_) > 30;
80 }
81
GetPeerIdHeader() const82 std::string ChannelMember::GetPeerIdHeader() const {
83 std::string ret(kPeerIdHeader + int2str(id_) + "\r\n");
84 return ret;
85 }
86
NotifyOfOtherMember(const ChannelMember & other)87 bool ChannelMember::NotifyOfOtherMember(const ChannelMember& other) {
88 RTC_DCHECK_NE(&other, this);
89 QueueResponse("200 OK", "text/plain", GetPeerIdHeader(), other.GetEntry());
90 return true;
91 }
92
93 // Returns a string in the form "name,id,connected\n".
GetEntry() const94 std::string ChannelMember::GetEntry() const {
95 RTC_DCHECK(name_.length() <= kMaxNameLength);
96
97 // name, 11-digit int, 1-digit bool, newline, null
98 char entry[kMaxNameLength + 15];
99 snprintf(entry, sizeof(entry), "%s,%d,%d\n",
100 name_.substr(0, kMaxNameLength).c_str(), id_, connected_);
101 return entry;
102 }
103
ForwardRequestToPeer(DataSocket * ds,ChannelMember * peer)104 void ChannelMember::ForwardRequestToPeer(DataSocket* ds, ChannelMember* peer) {
105 RTC_DCHECK(peer);
106 RTC_DCHECK(ds);
107
108 std::string extra_headers(GetPeerIdHeader());
109
110 if (peer == this) {
111 ds->Send("200 OK", true, ds->content_type(), extra_headers, ds->data());
112 } else {
113 printf("Client %s sending to %s\n", name_.c_str(), peer->name().c_str());
114 peer->QueueResponse("200 OK", ds->content_type(), extra_headers,
115 ds->data());
116 ds->Send("200 OK", true, "text/plain", "", "");
117 }
118 }
119
OnClosing(DataSocket * ds)120 void ChannelMember::OnClosing(DataSocket* ds) {
121 if (ds == waiting_socket_) {
122 waiting_socket_ = NULL;
123 timestamp_ = time(NULL);
124 }
125 }
126
QueueResponse(const std::string & status,const std::string & content_type,const std::string & extra_headers,const std::string & data)127 void ChannelMember::QueueResponse(const std::string& status,
128 const std::string& content_type,
129 const std::string& extra_headers,
130 const std::string& data) {
131 if (waiting_socket_) {
132 RTC_DCHECK(queue_.empty());
133 RTC_DCHECK_EQ(waiting_socket_->method(), DataSocket::GET);
134 bool ok =
135 waiting_socket_->Send(status, true, content_type, extra_headers, data);
136 if (!ok) {
137 printf("Failed to deliver data to waiting socket\n");
138 }
139 waiting_socket_ = NULL;
140 timestamp_ = time(NULL);
141 } else {
142 QueuedResponse qr;
143 qr.status = status;
144 qr.content_type = content_type;
145 qr.extra_headers = extra_headers;
146 qr.data = data;
147 queue_.push(qr);
148 }
149 }
150
SetWaitingSocket(DataSocket * ds)151 void ChannelMember::SetWaitingSocket(DataSocket* ds) {
152 RTC_DCHECK_EQ(ds->method(), DataSocket::GET);
153 if (ds && !queue_.empty()) {
154 RTC_DCHECK(!waiting_socket_);
155 const QueuedResponse& response = queue_.front();
156 ds->Send(response.status, true, response.content_type,
157 response.extra_headers, response.data);
158 queue_.pop();
159 } else {
160 waiting_socket_ = ds;
161 }
162 }
163
164 //
165 // PeerChannel
166 //
167
168 // static
IsPeerConnection(const DataSocket * ds)169 bool PeerChannel::IsPeerConnection(const DataSocket* ds) {
170 RTC_DCHECK(ds);
171 return (ds->method() == DataSocket::POST && ds->content_length() > 0) ||
172 (ds->method() == DataSocket::GET && ds->PathEquals("/sign_in"));
173 }
174
Lookup(DataSocket * ds) const175 ChannelMember* PeerChannel::Lookup(DataSocket* ds) const {
176 RTC_DCHECK(ds);
177
178 if (ds->method() != DataSocket::GET && ds->method() != DataSocket::POST)
179 return NULL;
180
181 size_t i = 0;
182 for (; i < ARRAYSIZE(kRequestPaths); ++i) {
183 if (ds->PathEquals(kRequestPaths[i]))
184 break;
185 }
186
187 if (i == ARRAYSIZE(kRequestPaths))
188 return NULL;
189
190 std::string args(ds->request_arguments());
191 static const char kPeerId[] = "peer_id=";
192 size_t found = args.find(kPeerId);
193 if (found == std::string::npos)
194 return NULL;
195
196 int id = atoi(&args[found + ARRAYSIZE(kPeerId) - 1]);
197 Members::const_iterator iter = members_.begin();
198 for (; iter != members_.end(); ++iter) {
199 if (id == (*iter)->id()) {
200 if (i == kWait)
201 (*iter)->SetWaitingSocket(ds);
202 if (i == kSignOut)
203 (*iter)->set_disconnected();
204 return *iter;
205 }
206 }
207
208 return NULL;
209 }
210
IsTargetedRequest(const DataSocket * ds) const211 ChannelMember* PeerChannel::IsTargetedRequest(const DataSocket* ds) const {
212 RTC_DCHECK(ds);
213 // Regardless of GET or POST, we look for the peer_id parameter
214 // only in the request_path.
215 const std::string& path = ds->request_path();
216 size_t args = path.find('?');
217 if (args == std::string::npos)
218 return NULL;
219 size_t found;
220 const char kTargetPeerIdParam[] = "to=";
221 do {
222 found = path.find(kTargetPeerIdParam, args);
223 if (found == std::string::npos)
224 return NULL;
225 if (found == (args + 1) || path[found - 1] == '&') {
226 found += ARRAYSIZE(kTargetPeerIdParam) - 1;
227 break;
228 }
229 args = found + ARRAYSIZE(kTargetPeerIdParam) - 1;
230 } while (true);
231 int id = atoi(&path[found]);
232 Members::const_iterator i = members_.begin();
233 for (; i != members_.end(); ++i) {
234 if ((*i)->id() == id) {
235 return *i;
236 }
237 }
238 return NULL;
239 }
240
AddMember(DataSocket * ds)241 bool PeerChannel::AddMember(DataSocket* ds) {
242 RTC_DCHECK(IsPeerConnection(ds));
243 ChannelMember* new_guy = new ChannelMember(ds);
244 Members failures;
245 BroadcastChangedState(*new_guy, &failures);
246 HandleDeliveryFailures(&failures);
247 members_.push_back(new_guy);
248
249 printf("New member added (total=%s): %s\n",
250 size_t2str(members_.size()).c_str(), new_guy->name().c_str());
251
252 // Let the newly connected peer know about other members of the channel.
253 std::string content_type;
254 std::string response = BuildResponseForNewMember(*new_guy, &content_type);
255 ds->Send("200 Added", true, content_type, new_guy->GetPeerIdHeader(),
256 response);
257 return true;
258 }
259
CloseAll()260 void PeerChannel::CloseAll() {
261 Members::const_iterator i = members_.begin();
262 for (; i != members_.end(); ++i) {
263 (*i)->QueueResponse("200 OK", "text/plain", "", "Server shutting down");
264 }
265 DeleteAll();
266 }
267
OnClosing(DataSocket * ds)268 void PeerChannel::OnClosing(DataSocket* ds) {
269 for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
270 ChannelMember* m = (*i);
271 m->OnClosing(ds);
272 if (!m->connected()) {
273 i = members_.erase(i);
274 Members failures;
275 BroadcastChangedState(*m, &failures);
276 HandleDeliveryFailures(&failures);
277 delete m;
278 if (i == members_.end())
279 break;
280 }
281 }
282 printf("Total connected: %s\n", size_t2str(members_.size()).c_str());
283 }
284
CheckForTimeout()285 void PeerChannel::CheckForTimeout() {
286 for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
287 ChannelMember* m = (*i);
288 if (m->TimedOut()) {
289 printf("Timeout: %s\n", m->name().c_str());
290 m->set_disconnected();
291 i = members_.erase(i);
292 Members failures;
293 BroadcastChangedState(*m, &failures);
294 HandleDeliveryFailures(&failures);
295 delete m;
296 if (i == members_.end())
297 break;
298 }
299 }
300 }
301
DeleteAll()302 void PeerChannel::DeleteAll() {
303 for (Members::iterator i = members_.begin(); i != members_.end(); ++i)
304 delete (*i);
305 members_.clear();
306 }
307
BroadcastChangedState(const ChannelMember & member,Members * delivery_failures)308 void PeerChannel::BroadcastChangedState(const ChannelMember& member,
309 Members* delivery_failures) {
310 // This function should be called prior to DataSocket::Close().
311 RTC_DCHECK(delivery_failures);
312
313 if (!member.connected()) {
314 printf("Member disconnected: %s\n", member.name().c_str());
315 }
316
317 Members::iterator i = members_.begin();
318 for (; i != members_.end(); ++i) {
319 if (&member != (*i)) {
320 if (!(*i)->NotifyOfOtherMember(member)) {
321 (*i)->set_disconnected();
322 delivery_failures->push_back(*i);
323 i = members_.erase(i);
324 if (i == members_.end())
325 break;
326 }
327 }
328 }
329 }
330
HandleDeliveryFailures(Members * failures)331 void PeerChannel::HandleDeliveryFailures(Members* failures) {
332 RTC_DCHECK(failures);
333
334 while (!failures->empty()) {
335 Members::iterator i = failures->begin();
336 ChannelMember* member = *i;
337 RTC_DCHECK(!member->connected());
338 failures->erase(i);
339 BroadcastChangedState(*member, failures);
340 delete member;
341 }
342 }
343
344 // Builds a simple list of "name,id\n" entries for each member.
BuildResponseForNewMember(const ChannelMember & member,std::string * content_type)345 std::string PeerChannel::BuildResponseForNewMember(const ChannelMember& member,
346 std::string* content_type) {
347 RTC_DCHECK(content_type);
348
349 *content_type = "text/plain";
350 // The peer itself will always be the first entry.
351 std::string response(member.GetEntry());
352 for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
353 if (member.id() != (*i)->id()) {
354 RTC_DCHECK((*i)->connected());
355 response += (*i)->GetEntry();
356 }
357 }
358
359 return response;
360 }
361