• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "talk/p2p/base/transport.h"
29 
30 #include "talk/base/common.h"
31 #include "talk/base/logging.h"
32 #include "talk/p2p/base/candidate.h"
33 #include "talk/p2p/base/constants.h"
34 #include "talk/p2p/base/sessionmanager.h"
35 #include "talk/p2p/base/parsing.h"
36 #include "talk/p2p/base/transportchannelimpl.h"
37 #include "talk/xmllite/xmlelement.h"
38 #include "talk/xmpp/constants.h"
39 
40 namespace cricket {
41 
42 struct ChannelParams {
ChannelParamscricket::ChannelParams43   ChannelParams() : channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams44   explicit ChannelParams(const std::string& name)
45       : name(name), channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams46   ChannelParams(const std::string& name,
47                 const std::string& content_type)
48       : name(name), content_type(content_type),
49         channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams50   explicit ChannelParams(cricket::Candidate* candidate) :
51       channel(NULL), candidate(candidate) {
52     name = candidate->name();
53   }
54 
~ChannelParamscricket::ChannelParams55   ~ChannelParams() {
56     delete candidate;
57   }
58 
59   std::string name;
60   std::string content_type;
61   cricket::TransportChannelImpl* channel;
62   cricket::Candidate* candidate;
63 };
64 typedef talk_base::TypedMessageData<ChannelParams*> ChannelMessage;
65 
66 enum {
67   MSG_CREATECHANNEL = 1,
68   MSG_DESTROYCHANNEL = 2,
69   MSG_DESTROYALLCHANNELS = 3,
70   MSG_CONNECTCHANNELS = 4,
71   MSG_RESETCHANNELS = 5,
72   MSG_ONSIGNALINGREADY = 6,
73   MSG_ONREMOTECANDIDATE = 7,
74   MSG_READSTATE = 8,
75   MSG_WRITESTATE = 9,
76   MSG_REQUESTSIGNALING = 10,
77   MSG_ONCHANNELCANDIDATEREADY = 11,
78   MSG_CONNECTING = 12,
79 };
80 
Transport(talk_base::Thread * signaling_thread,talk_base::Thread * worker_thread,const std::string & type,PortAllocator * allocator)81 Transport::Transport(talk_base::Thread* signaling_thread,
82                      talk_base::Thread* worker_thread,
83                      const std::string& type,
84                      PortAllocator* allocator)
85   : signaling_thread_(signaling_thread),
86     worker_thread_(worker_thread), type_(type), allocator_(allocator),
87     destroyed_(false), readable_(false), writable_(false),
88     connect_requested_(false), allow_local_ips_(false) {
89 }
90 
~Transport()91 Transport::~Transport() {
92   ASSERT(signaling_thread_->IsCurrent());
93   ASSERT(destroyed_);
94 }
95 
CreateChannel(const std::string & name,const std::string & content_type)96 TransportChannelImpl* Transport::CreateChannel(
97     const std::string& name, const std::string& content_type) {
98   ChannelParams params(name, content_type);
99   ChannelMessage msg(&params);
100   worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
101   return msg.data()->channel;
102 }
103 
CreateChannel_w(const std::string & name,const std::string & content_type)104 TransportChannelImpl* Transport::CreateChannel_w(
105     const std::string& name, const std::string& content_type) {
106   ASSERT(worker_thread()->IsCurrent());
107 
108   TransportChannelImpl* impl = CreateTransportChannel(name, content_type);
109   impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
110   impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
111   impl->SignalRequestSignaling.connect(
112       this, &Transport::OnChannelRequestSignaling);
113   impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
114 
115   talk_base::CritScope cs(&crit_);
116   ASSERT(channels_.find(name) == channels_.end());
117   channels_[name] = impl;
118   destroyed_ = false;
119   if (connect_requested_) {
120     impl->Connect();
121     if (channels_.size() == 1) {
122       // If this is the first channel, then indicate that we have started
123       // connecting.
124       signaling_thread()->Post(this, MSG_CONNECTING, NULL);
125     }
126   }
127   return impl;
128 }
129 
GetChannel(const std::string & name)130 TransportChannelImpl* Transport::GetChannel(const std::string& name) {
131   talk_base::CritScope cs(&crit_);
132   ChannelMap::iterator iter = channels_.find(name);
133   return (iter != channels_.end()) ? iter->second : NULL;
134 }
135 
HasChannels()136 bool Transport::HasChannels() {
137   talk_base::CritScope cs(&crit_);
138   return !channels_.empty();
139 }
140 
DestroyChannel(const std::string & name)141 void Transport::DestroyChannel(const std::string& name) {
142   ChannelParams params(name);
143   ChannelMessage msg(&params);
144   worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
145 }
146 
DestroyChannel_w(const std::string & name)147 void Transport::DestroyChannel_w(const std::string& name) {
148   ASSERT(worker_thread()->IsCurrent());
149 
150   TransportChannelImpl* impl = NULL;
151   {
152     talk_base::CritScope cs(&crit_);
153     ChannelMap::iterator iter = channels_.find(name);
154     if (iter == channels_.end())
155       return;
156     impl = iter->second;
157     channels_.erase(iter);
158   }
159 
160   if (connect_requested_ && channels_.empty()) {
161     // We're no longer attempting to connect.
162     signaling_thread()->Post(this, MSG_CONNECTING, NULL);
163   }
164 
165   if (impl) {
166     // Check in case the deleted channel was the only non-writable channel.
167     OnChannelWritableState(impl);
168     DestroyTransportChannel(impl);
169   }
170 }
171 
ConnectChannels()172 void Transport::ConnectChannels() {
173   ASSERT(signaling_thread()->IsCurrent());
174   worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL);
175 }
176 
ConnectChannels_w()177 void Transport::ConnectChannels_w() {
178   ASSERT(worker_thread()->IsCurrent());
179   if (connect_requested_ || channels_.empty())
180     return;
181   connect_requested_ = true;
182   signaling_thread()->Post(
183       this, MSG_ONCHANNELCANDIDATEREADY, NULL);
184   CallChannels_w(&TransportChannelImpl::Connect);
185   if (!channels_.empty()) {
186     signaling_thread()->Post(this, MSG_CONNECTING, NULL);
187   }
188 }
189 
OnConnecting_s()190 void Transport::OnConnecting_s() {
191   ASSERT(signaling_thread()->IsCurrent());
192   SignalConnecting(this);
193 }
194 
DestroyAllChannels()195 void Transport::DestroyAllChannels() {
196   ASSERT(signaling_thread()->IsCurrent());
197   worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
198   worker_thread()->Clear(this);
199   signaling_thread()->Clear(this);
200   destroyed_ = true;
201 }
202 
DestroyAllChannels_w()203 void Transport::DestroyAllChannels_w() {
204   ASSERT(worker_thread()->IsCurrent());
205   std::vector<TransportChannelImpl*> impls;
206   {
207     talk_base::CritScope cs(&crit_);
208     for (ChannelMap::iterator iter = channels_.begin();
209          iter != channels_.end();
210          ++iter) {
211       impls.push_back(iter->second);
212     }
213     channels_.clear();
214   }
215 
216   for (size_t i = 0; i < impls.size(); ++i)
217     DestroyTransportChannel(impls[i]);
218 }
219 
ResetChannels()220 void Transport::ResetChannels() {
221   ASSERT(signaling_thread()->IsCurrent());
222   worker_thread()->Send(this, MSG_RESETCHANNELS, NULL);
223 }
224 
ResetChannels_w()225 void Transport::ResetChannels_w() {
226   ASSERT(worker_thread()->IsCurrent());
227 
228   // We are no longer attempting to connect
229   connect_requested_ = false;
230 
231   // Clear out the old messages, they aren't relevant
232   talk_base::CritScope cs(&crit_);
233   ready_candidates_.clear();
234 
235   // Reset all of the channels
236   CallChannels_w(&TransportChannelImpl::Reset);
237 }
238 
OnSignalingReady()239 void Transport::OnSignalingReady() {
240   ASSERT(signaling_thread()->IsCurrent());
241   if (destroyed_) return;
242 
243   worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
244 
245   // Notify the subclass.
246   OnTransportSignalingReady();
247 }
248 
CallChannels_w(TransportChannelFunc func)249 void Transport::CallChannels_w(TransportChannelFunc func) {
250   ASSERT(worker_thread()->IsCurrent());
251   talk_base::CritScope cs(&crit_);
252   for (ChannelMap::iterator iter = channels_.begin();
253        iter != channels_.end();
254        ++iter) {
255     ((iter->second)->*func)();
256   }
257 }
258 
VerifyCandidate(const Candidate & cand,ParseError * error)259 bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) {
260   if (cand.address().IsLocalIP() && !allow_local_ips_)
261     return BadParse("candidate has local IP address", error);
262 
263   // No address zero.
264   if (cand.address().IsAny()) {
265     return BadParse("candidate has address of zero", error);
266   }
267 
268   // Disallow all ports below 1024, except for 80 and 443 on public addresses.
269   int port = cand.address().port();
270   if (port < 1024) {
271     if ((port != 80) && (port != 443))
272       return BadParse(
273           "candidate has port below 1024, but not 80 or 443", error);
274     if (cand.address().IsPrivateIP()) {
275       return BadParse(
276           "candidate has port of 80 or 443 with private IP address", error);
277     }
278   }
279 
280   return true;
281 }
282 
OnRemoteCandidates(const std::vector<Candidate> & candidates)283 void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
284   for (std::vector<Candidate>::const_iterator iter = candidates.begin();
285        iter != candidates.end();
286        ++iter) {
287     OnRemoteCandidate(*iter);
288   }
289 }
290 
OnRemoteCandidate(const Candidate & candidate)291 void Transport::OnRemoteCandidate(const Candidate& candidate) {
292   ASSERT(signaling_thread()->IsCurrent());
293   if (destroyed_) return;
294   if (!HasChannel(candidate.name())) {
295     LOG(LS_WARNING) << "Ignoring candidate for unknown channel "
296                     << candidate.name();
297     return;
298   }
299 
300   // new candidate deleted when params is deleted
301   ChannelParams* params = new ChannelParams(new Candidate(candidate));
302   ChannelMessage* msg = new ChannelMessage(params);
303   worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg);
304 }
305 
OnRemoteCandidate_w(const Candidate & candidate)306 void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
307   ASSERT(worker_thread()->IsCurrent());
308   ChannelMap::iterator iter = channels_.find(candidate.name());
309   // It's ok for a channel to go away while this message is in transit.
310   if (iter != channels_.end()) {
311     iter->second->OnCandidate(candidate);
312   }
313 }
314 
OnChannelReadableState(TransportChannel * channel)315 void Transport::OnChannelReadableState(TransportChannel* channel) {
316   ASSERT(worker_thread()->IsCurrent());
317   signaling_thread()->Post(this, MSG_READSTATE, NULL);
318 }
319 
OnChannelReadableState_s()320 void Transport::OnChannelReadableState_s() {
321   ASSERT(signaling_thread()->IsCurrent());
322   bool readable = GetTransportState_s(true);
323   if (readable_ != readable) {
324     readable_ = readable;
325     SignalReadableState(this);
326   }
327 }
328 
OnChannelWritableState(TransportChannel * channel)329 void Transport::OnChannelWritableState(TransportChannel* channel) {
330   ASSERT(worker_thread()->IsCurrent());
331   signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
332 }
333 
OnChannelWritableState_s()334 void Transport::OnChannelWritableState_s() {
335   ASSERT(signaling_thread()->IsCurrent());
336   bool writable = GetTransportState_s(false);
337   if (writable_ != writable) {
338     writable_ = writable;
339     SignalWritableState(this);
340   }
341 }
342 
GetTransportState_s(bool read)343 bool Transport::GetTransportState_s(bool read) {
344   ASSERT(signaling_thread()->IsCurrent());
345   bool result = false;
346   talk_base::CritScope cs(&crit_);
347   for (ChannelMap::iterator iter = channels_.begin();
348        iter != channels_.end();
349        ++iter) {
350     bool b = (read ? iter->second->readable() : iter->second->writable());
351     result = result || b;
352   }
353   return result;
354 }
355 
OnChannelRequestSignaling()356 void Transport::OnChannelRequestSignaling() {
357   ASSERT(worker_thread()->IsCurrent());
358   signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL);
359 }
360 
OnChannelRequestSignaling_s()361 void Transport::OnChannelRequestSignaling_s() {
362   ASSERT(signaling_thread()->IsCurrent());
363   SignalRequestSignaling(this);
364 }
365 
OnChannelCandidateReady(TransportChannelImpl * channel,const Candidate & candidate)366 void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
367                                         const Candidate& candidate) {
368   ASSERT(worker_thread()->IsCurrent());
369   talk_base::CritScope cs(&crit_);
370   ready_candidates_.push_back(candidate);
371 
372   // We hold any messages until the client lets us connect.
373   if (connect_requested_) {
374     signaling_thread()->Post(
375         this, MSG_ONCHANNELCANDIDATEREADY, NULL);
376   }
377 }
378 
OnChannelCandidateReady_s()379 void Transport::OnChannelCandidateReady_s() {
380   ASSERT(signaling_thread()->IsCurrent());
381   ASSERT(connect_requested_);
382 
383   std::vector<Candidate> candidates;
384   {
385     talk_base::CritScope cs(&crit_);
386     candidates.swap(ready_candidates_);
387   }
388 
389   // we do the deleting of Candidate* here to keep the new above and
390   // delete below close to each other
391   if (!candidates.empty()) {
392     SignalCandidatesReady(this, candidates);
393   }
394 }
395 
OnMessage(talk_base::Message * msg)396 void Transport::OnMessage(talk_base::Message* msg) {
397   switch (msg->message_id) {
398   case MSG_CREATECHANNEL:
399     {
400       ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
401       params->channel = CreateChannel_w(params->name, params->content_type);
402     }
403     break;
404   case MSG_DESTROYCHANNEL:
405     {
406       ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
407       DestroyChannel_w(params->name);
408     }
409     break;
410   case MSG_CONNECTCHANNELS:
411     ConnectChannels_w();
412     break;
413   case MSG_RESETCHANNELS:
414     ResetChannels_w();
415     break;
416   case MSG_DESTROYALLCHANNELS:
417     DestroyAllChannels_w();
418     break;
419   case MSG_ONSIGNALINGREADY:
420     CallChannels_w(&TransportChannelImpl::OnSignalingReady);
421     break;
422   case MSG_ONREMOTECANDIDATE:
423     {
424       ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
425       ChannelParams* params = channel_msg->data();
426       OnRemoteCandidate_w(*(params->candidate));
427       delete params;
428       delete channel_msg;
429     }
430     break;
431   case MSG_CONNECTING:
432     OnConnecting_s();
433     break;
434   case MSG_READSTATE:
435     OnChannelReadableState_s();
436     break;
437   case MSG_WRITESTATE:
438     OnChannelWritableState_s();
439     break;
440   case MSG_REQUESTSIGNALING:
441     OnChannelRequestSignaling_s();
442     break;
443   case MSG_ONCHANNELCANDIDATEREADY:
444     OnChannelCandidateReady_s();
445     break;
446   }
447 }
448 
ParseAddress(const buzz::XmlElement * elem,const buzz::QName & address_name,const buzz::QName & port_name,talk_base::SocketAddress * address,ParseError * error)449 bool TransportParser::ParseAddress(const buzz::XmlElement* elem,
450                                    const buzz::QName& address_name,
451                                    const buzz::QName& port_name,
452                                    talk_base::SocketAddress* address,
453                                    ParseError* error) {
454   if (!elem->HasAttr(address_name))
455     return BadParse("address does not have " + address_name.LocalPart(), error);
456   if (!elem->HasAttr(port_name))
457     return BadParse("address does not have " + port_name.LocalPart(), error);
458 
459   address->SetIP(elem->Attr(address_name));
460   std::istringstream ist(elem->Attr(port_name));
461   int port = 0;
462   ist >> port;
463   address->SetPort(port);
464 
465   return true;
466 }
467 
468 }  // namespace cricket
469