1 /*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include "talk/p2p/base/transport.h"
29
30 #include "talk/base/common.h"
31 #include "talk/base/logging.h"
32 #include "talk/p2p/base/candidate.h"
33 #include "talk/p2p/base/constants.h"
34 #include "talk/p2p/base/sessionmanager.h"
35 #include "talk/p2p/base/parsing.h"
36 #include "talk/p2p/base/transportchannelimpl.h"
37 #include "talk/xmllite/xmlelement.h"
38 #include "talk/xmpp/constants.h"
39
40 namespace cricket {
41
42 struct ChannelParams {
ChannelParamscricket::ChannelParams43 ChannelParams() : channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams44 explicit ChannelParams(const std::string& name)
45 : name(name), channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams46 ChannelParams(const std::string& name,
47 const std::string& content_type)
48 : name(name), content_type(content_type),
49 channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams50 explicit ChannelParams(cricket::Candidate* candidate) :
51 channel(NULL), candidate(candidate) {
52 name = candidate->name();
53 }
54
~ChannelParamscricket::ChannelParams55 ~ChannelParams() {
56 delete candidate;
57 }
58
59 std::string name;
60 std::string content_type;
61 cricket::TransportChannelImpl* channel;
62 cricket::Candidate* candidate;
63 };
64 typedef talk_base::TypedMessageData<ChannelParams*> ChannelMessage;
65
66 enum {
67 MSG_CREATECHANNEL = 1,
68 MSG_DESTROYCHANNEL = 2,
69 MSG_DESTROYALLCHANNELS = 3,
70 MSG_CONNECTCHANNELS = 4,
71 MSG_RESETCHANNELS = 5,
72 MSG_ONSIGNALINGREADY = 6,
73 MSG_ONREMOTECANDIDATE = 7,
74 MSG_READSTATE = 8,
75 MSG_WRITESTATE = 9,
76 MSG_REQUESTSIGNALING = 10,
77 MSG_ONCHANNELCANDIDATEREADY = 11,
78 MSG_CONNECTING = 12,
79 };
80
Transport(talk_base::Thread * signaling_thread,talk_base::Thread * worker_thread,const std::string & type,PortAllocator * allocator)81 Transport::Transport(talk_base::Thread* signaling_thread,
82 talk_base::Thread* worker_thread,
83 const std::string& type,
84 PortAllocator* allocator)
85 : signaling_thread_(signaling_thread),
86 worker_thread_(worker_thread), type_(type), allocator_(allocator),
87 destroyed_(false), readable_(false), writable_(false),
88 connect_requested_(false), allow_local_ips_(false) {
89 }
90
~Transport()91 Transport::~Transport() {
92 ASSERT(signaling_thread_->IsCurrent());
93 ASSERT(destroyed_);
94 }
95
CreateChannel(const std::string & name,const std::string & content_type)96 TransportChannelImpl* Transport::CreateChannel(
97 const std::string& name, const std::string& content_type) {
98 ChannelParams params(name, content_type);
99 ChannelMessage msg(¶ms);
100 worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
101 return msg.data()->channel;
102 }
103
CreateChannel_w(const std::string & name,const std::string & content_type)104 TransportChannelImpl* Transport::CreateChannel_w(
105 const std::string& name, const std::string& content_type) {
106 ASSERT(worker_thread()->IsCurrent());
107
108 TransportChannelImpl* impl = CreateTransportChannel(name, content_type);
109 impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
110 impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
111 impl->SignalRequestSignaling.connect(
112 this, &Transport::OnChannelRequestSignaling);
113 impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
114
115 talk_base::CritScope cs(&crit_);
116 ASSERT(channels_.find(name) == channels_.end());
117 channels_[name] = impl;
118 destroyed_ = false;
119 if (connect_requested_) {
120 impl->Connect();
121 if (channels_.size() == 1) {
122 // If this is the first channel, then indicate that we have started
123 // connecting.
124 signaling_thread()->Post(this, MSG_CONNECTING, NULL);
125 }
126 }
127 return impl;
128 }
129
GetChannel(const std::string & name)130 TransportChannelImpl* Transport::GetChannel(const std::string& name) {
131 talk_base::CritScope cs(&crit_);
132 ChannelMap::iterator iter = channels_.find(name);
133 return (iter != channels_.end()) ? iter->second : NULL;
134 }
135
HasChannels()136 bool Transport::HasChannels() {
137 talk_base::CritScope cs(&crit_);
138 return !channels_.empty();
139 }
140
DestroyChannel(const std::string & name)141 void Transport::DestroyChannel(const std::string& name) {
142 ChannelParams params(name);
143 ChannelMessage msg(¶ms);
144 worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
145 }
146
DestroyChannel_w(const std::string & name)147 void Transport::DestroyChannel_w(const std::string& name) {
148 ASSERT(worker_thread()->IsCurrent());
149
150 TransportChannelImpl* impl = NULL;
151 {
152 talk_base::CritScope cs(&crit_);
153 ChannelMap::iterator iter = channels_.find(name);
154 if (iter == channels_.end())
155 return;
156 impl = iter->second;
157 channels_.erase(iter);
158 }
159
160 if (connect_requested_ && channels_.empty()) {
161 // We're no longer attempting to connect.
162 signaling_thread()->Post(this, MSG_CONNECTING, NULL);
163 }
164
165 if (impl) {
166 // Check in case the deleted channel was the only non-writable channel.
167 OnChannelWritableState(impl);
168 DestroyTransportChannel(impl);
169 }
170 }
171
ConnectChannels()172 void Transport::ConnectChannels() {
173 ASSERT(signaling_thread()->IsCurrent());
174 worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL);
175 }
176
ConnectChannels_w()177 void Transport::ConnectChannels_w() {
178 ASSERT(worker_thread()->IsCurrent());
179 if (connect_requested_ || channels_.empty())
180 return;
181 connect_requested_ = true;
182 signaling_thread()->Post(
183 this, MSG_ONCHANNELCANDIDATEREADY, NULL);
184 CallChannels_w(&TransportChannelImpl::Connect);
185 if (!channels_.empty()) {
186 signaling_thread()->Post(this, MSG_CONNECTING, NULL);
187 }
188 }
189
OnConnecting_s()190 void Transport::OnConnecting_s() {
191 ASSERT(signaling_thread()->IsCurrent());
192 SignalConnecting(this);
193 }
194
DestroyAllChannels()195 void Transport::DestroyAllChannels() {
196 ASSERT(signaling_thread()->IsCurrent());
197 worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
198 worker_thread()->Clear(this);
199 signaling_thread()->Clear(this);
200 destroyed_ = true;
201 }
202
DestroyAllChannels_w()203 void Transport::DestroyAllChannels_w() {
204 ASSERT(worker_thread()->IsCurrent());
205 std::vector<TransportChannelImpl*> impls;
206 {
207 talk_base::CritScope cs(&crit_);
208 for (ChannelMap::iterator iter = channels_.begin();
209 iter != channels_.end();
210 ++iter) {
211 impls.push_back(iter->second);
212 }
213 channels_.clear();
214 }
215
216 for (size_t i = 0; i < impls.size(); ++i)
217 DestroyTransportChannel(impls[i]);
218 }
219
ResetChannels()220 void Transport::ResetChannels() {
221 ASSERT(signaling_thread()->IsCurrent());
222 worker_thread()->Send(this, MSG_RESETCHANNELS, NULL);
223 }
224
ResetChannels_w()225 void Transport::ResetChannels_w() {
226 ASSERT(worker_thread()->IsCurrent());
227
228 // We are no longer attempting to connect
229 connect_requested_ = false;
230
231 // Clear out the old messages, they aren't relevant
232 talk_base::CritScope cs(&crit_);
233 ready_candidates_.clear();
234
235 // Reset all of the channels
236 CallChannels_w(&TransportChannelImpl::Reset);
237 }
238
OnSignalingReady()239 void Transport::OnSignalingReady() {
240 ASSERT(signaling_thread()->IsCurrent());
241 if (destroyed_) return;
242
243 worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
244
245 // Notify the subclass.
246 OnTransportSignalingReady();
247 }
248
CallChannels_w(TransportChannelFunc func)249 void Transport::CallChannels_w(TransportChannelFunc func) {
250 ASSERT(worker_thread()->IsCurrent());
251 talk_base::CritScope cs(&crit_);
252 for (ChannelMap::iterator iter = channels_.begin();
253 iter != channels_.end();
254 ++iter) {
255 ((iter->second)->*func)();
256 }
257 }
258
VerifyCandidate(const Candidate & cand,ParseError * error)259 bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) {
260 if (cand.address().IsLocalIP() && !allow_local_ips_)
261 return BadParse("candidate has local IP address", error);
262
263 // No address zero.
264 if (cand.address().IsAny()) {
265 return BadParse("candidate has address of zero", error);
266 }
267
268 // Disallow all ports below 1024, except for 80 and 443 on public addresses.
269 int port = cand.address().port();
270 if (port < 1024) {
271 if ((port != 80) && (port != 443))
272 return BadParse(
273 "candidate has port below 1024, but not 80 or 443", error);
274 if (cand.address().IsPrivateIP()) {
275 return BadParse(
276 "candidate has port of 80 or 443 with private IP address", error);
277 }
278 }
279
280 return true;
281 }
282
OnRemoteCandidates(const std::vector<Candidate> & candidates)283 void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
284 for (std::vector<Candidate>::const_iterator iter = candidates.begin();
285 iter != candidates.end();
286 ++iter) {
287 OnRemoteCandidate(*iter);
288 }
289 }
290
OnRemoteCandidate(const Candidate & candidate)291 void Transport::OnRemoteCandidate(const Candidate& candidate) {
292 ASSERT(signaling_thread()->IsCurrent());
293 if (destroyed_) return;
294 if (!HasChannel(candidate.name())) {
295 LOG(LS_WARNING) << "Ignoring candidate for unknown channel "
296 << candidate.name();
297 return;
298 }
299
300 // new candidate deleted when params is deleted
301 ChannelParams* params = new ChannelParams(new Candidate(candidate));
302 ChannelMessage* msg = new ChannelMessage(params);
303 worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg);
304 }
305
OnRemoteCandidate_w(const Candidate & candidate)306 void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
307 ASSERT(worker_thread()->IsCurrent());
308 ChannelMap::iterator iter = channels_.find(candidate.name());
309 // It's ok for a channel to go away while this message is in transit.
310 if (iter != channels_.end()) {
311 iter->second->OnCandidate(candidate);
312 }
313 }
314
OnChannelReadableState(TransportChannel * channel)315 void Transport::OnChannelReadableState(TransportChannel* channel) {
316 ASSERT(worker_thread()->IsCurrent());
317 signaling_thread()->Post(this, MSG_READSTATE, NULL);
318 }
319
OnChannelReadableState_s()320 void Transport::OnChannelReadableState_s() {
321 ASSERT(signaling_thread()->IsCurrent());
322 bool readable = GetTransportState_s(true);
323 if (readable_ != readable) {
324 readable_ = readable;
325 SignalReadableState(this);
326 }
327 }
328
OnChannelWritableState(TransportChannel * channel)329 void Transport::OnChannelWritableState(TransportChannel* channel) {
330 ASSERT(worker_thread()->IsCurrent());
331 signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
332 }
333
OnChannelWritableState_s()334 void Transport::OnChannelWritableState_s() {
335 ASSERT(signaling_thread()->IsCurrent());
336 bool writable = GetTransportState_s(false);
337 if (writable_ != writable) {
338 writable_ = writable;
339 SignalWritableState(this);
340 }
341 }
342
GetTransportState_s(bool read)343 bool Transport::GetTransportState_s(bool read) {
344 ASSERT(signaling_thread()->IsCurrent());
345 bool result = false;
346 talk_base::CritScope cs(&crit_);
347 for (ChannelMap::iterator iter = channels_.begin();
348 iter != channels_.end();
349 ++iter) {
350 bool b = (read ? iter->second->readable() : iter->second->writable());
351 result = result || b;
352 }
353 return result;
354 }
355
OnChannelRequestSignaling()356 void Transport::OnChannelRequestSignaling() {
357 ASSERT(worker_thread()->IsCurrent());
358 signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL);
359 }
360
OnChannelRequestSignaling_s()361 void Transport::OnChannelRequestSignaling_s() {
362 ASSERT(signaling_thread()->IsCurrent());
363 SignalRequestSignaling(this);
364 }
365
OnChannelCandidateReady(TransportChannelImpl * channel,const Candidate & candidate)366 void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
367 const Candidate& candidate) {
368 ASSERT(worker_thread()->IsCurrent());
369 talk_base::CritScope cs(&crit_);
370 ready_candidates_.push_back(candidate);
371
372 // We hold any messages until the client lets us connect.
373 if (connect_requested_) {
374 signaling_thread()->Post(
375 this, MSG_ONCHANNELCANDIDATEREADY, NULL);
376 }
377 }
378
OnChannelCandidateReady_s()379 void Transport::OnChannelCandidateReady_s() {
380 ASSERT(signaling_thread()->IsCurrent());
381 ASSERT(connect_requested_);
382
383 std::vector<Candidate> candidates;
384 {
385 talk_base::CritScope cs(&crit_);
386 candidates.swap(ready_candidates_);
387 }
388
389 // we do the deleting of Candidate* here to keep the new above and
390 // delete below close to each other
391 if (!candidates.empty()) {
392 SignalCandidatesReady(this, candidates);
393 }
394 }
395
OnMessage(talk_base::Message * msg)396 void Transport::OnMessage(talk_base::Message* msg) {
397 switch (msg->message_id) {
398 case MSG_CREATECHANNEL:
399 {
400 ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
401 params->channel = CreateChannel_w(params->name, params->content_type);
402 }
403 break;
404 case MSG_DESTROYCHANNEL:
405 {
406 ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
407 DestroyChannel_w(params->name);
408 }
409 break;
410 case MSG_CONNECTCHANNELS:
411 ConnectChannels_w();
412 break;
413 case MSG_RESETCHANNELS:
414 ResetChannels_w();
415 break;
416 case MSG_DESTROYALLCHANNELS:
417 DestroyAllChannels_w();
418 break;
419 case MSG_ONSIGNALINGREADY:
420 CallChannels_w(&TransportChannelImpl::OnSignalingReady);
421 break;
422 case MSG_ONREMOTECANDIDATE:
423 {
424 ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
425 ChannelParams* params = channel_msg->data();
426 OnRemoteCandidate_w(*(params->candidate));
427 delete params;
428 delete channel_msg;
429 }
430 break;
431 case MSG_CONNECTING:
432 OnConnecting_s();
433 break;
434 case MSG_READSTATE:
435 OnChannelReadableState_s();
436 break;
437 case MSG_WRITESTATE:
438 OnChannelWritableState_s();
439 break;
440 case MSG_REQUESTSIGNALING:
441 OnChannelRequestSignaling_s();
442 break;
443 case MSG_ONCHANNELCANDIDATEREADY:
444 OnChannelCandidateReady_s();
445 break;
446 }
447 }
448
ParseAddress(const buzz::XmlElement * elem,const buzz::QName & address_name,const buzz::QName & port_name,talk_base::SocketAddress * address,ParseError * error)449 bool TransportParser::ParseAddress(const buzz::XmlElement* elem,
450 const buzz::QName& address_name,
451 const buzz::QName& port_name,
452 talk_base::SocketAddress* address,
453 ParseError* error) {
454 if (!elem->HasAttr(address_name))
455 return BadParse("address does not have " + address_name.LocalPart(), error);
456 if (!elem->HasAttr(port_name))
457 return BadParse("address does not have " + port_name.LocalPart(), error);
458
459 address->SetIP(elem->Attr(address_name));
460 std::istringstream ist(elem->Attr(port_name));
461 int port = 0;
462 ist >> port;
463 address->SetPort(port);
464
465 return true;
466 }
467
468 } // namespace cricket
469