• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "talk/p2p/base/session.h"
29 
30 #include "talk/p2p/base/dtlstransport.h"
31 #include "talk/p2p/base/p2ptransport.h"
32 #include "talk/p2p/base/sessionclient.h"
33 #include "talk/p2p/base/transport.h"
34 #include "talk/p2p/base/transportchannelproxy.h"
35 #include "talk/p2p/base/transportinfo.h"
36 #include "talk/xmpp/constants.h"
37 #include "talk/xmpp/jid.h"
38 #include "webrtc/base/bind.h"
39 #include "webrtc/base/common.h"
40 #include "webrtc/base/helpers.h"
41 #include "webrtc/base/logging.h"
42 #include "webrtc/base/scoped_ptr.h"
43 #include "webrtc/base/sslstreamadapter.h"
44 
45 #include "talk/p2p/base/constants.h"
46 
47 namespace cricket {
48 
49 using rtc::Bind;
50 
BadMessage(const buzz::QName type,const std::string & text,MessageError * err)51 bool BadMessage(const buzz::QName type,
52                 const std::string& text,
53                 MessageError* err) {
54   err->SetType(type);
55   err->SetText(text);
56   return false;
57 }
58 
~TransportProxy()59 TransportProxy::~TransportProxy() {
60   for (ChannelMap::iterator iter = channels_.begin();
61        iter != channels_.end(); ++iter) {
62     iter->second->SignalDestroyed(iter->second);
63     delete iter->second;
64   }
65 }
66 
type() const67 const std::string& TransportProxy::type() const {
68   return transport_->get()->type();
69 }
70 
GetChannel(int component)71 TransportChannel* TransportProxy::GetChannel(int component) {
72   ASSERT(rtc::Thread::Current() == worker_thread_);
73   return GetChannelProxy(component);
74 }
75 
CreateChannel(const std::string & name,int component)76 TransportChannel* TransportProxy::CreateChannel(
77     const std::string& name, int component) {
78   ASSERT(rtc::Thread::Current() == worker_thread_);
79   ASSERT(GetChannel(component) == NULL);
80   ASSERT(!transport_->get()->HasChannel(component));
81 
82   // We always create a proxy in case we need to change out the transport later.
83   TransportChannelProxy* channel =
84       new TransportChannelProxy(content_name(), name, component);
85   channels_[component] = channel;
86 
87   // If we're already negotiated, create an impl and hook it up to the proxy
88   // channel. If we're connecting, create an impl but don't hook it up yet.
89   if (negotiated_) {
90     SetupChannelProxy_w(component, channel);
91   } else if (connecting_) {
92     GetOrCreateChannelProxyImpl_w(component);
93   }
94   return channel;
95 }
96 
HasChannel(int component)97 bool TransportProxy::HasChannel(int component) {
98   return transport_->get()->HasChannel(component);
99 }
100 
DestroyChannel(int component)101 void TransportProxy::DestroyChannel(int component) {
102   ASSERT(rtc::Thread::Current() == worker_thread_);
103   TransportChannel* channel = GetChannel(component);
104   if (channel) {
105     // If the state of TransportProxy is not NEGOTIATED
106     // then TransportChannelProxy and its impl are not
107     // connected. Both must be connected before
108     // deletion.
109     if (!negotiated_) {
110       SetupChannelProxy_w(component, GetChannelProxy(component));
111     }
112 
113     channels_.erase(component);
114     channel->SignalDestroyed(channel);
115     delete channel;
116   }
117 }
118 
ConnectChannels()119 void TransportProxy::ConnectChannels() {
120   if (!connecting_) {
121     if (!negotiated_) {
122       for (ChannelMap::iterator iter = channels_.begin();
123            iter != channels_.end(); ++iter) {
124         GetOrCreateChannelProxyImpl(iter->first);
125       }
126     }
127     connecting_ = true;
128   }
129   // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
130   // don't have any channels yet, so we need to allow this method to be called
131   // multiple times. Once we fix Transport, we can move this call inside the
132   // if (!connecting_) block.
133   transport_->get()->ConnectChannels();
134 }
135 
CompleteNegotiation()136 void TransportProxy::CompleteNegotiation() {
137   if (!negotiated_) {
138     for (ChannelMap::iterator iter = channels_.begin();
139          iter != channels_.end(); ++iter) {
140       SetupChannelProxy(iter->first, iter->second);
141     }
142     negotiated_ = true;
143   }
144 }
145 
AddSentCandidates(const Candidates & candidates)146 void TransportProxy::AddSentCandidates(const Candidates& candidates) {
147   for (Candidates::const_iterator cand = candidates.begin();
148        cand != candidates.end(); ++cand) {
149     sent_candidates_.push_back(*cand);
150   }
151 }
152 
AddUnsentCandidates(const Candidates & candidates)153 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
154   for (Candidates::const_iterator cand = candidates.begin();
155        cand != candidates.end(); ++cand) {
156     unsent_candidates_.push_back(*cand);
157   }
158 }
159 
GetChannelNameFromComponent(int component,std::string * channel_name) const160 bool TransportProxy::GetChannelNameFromComponent(
161     int component, std::string* channel_name) const {
162   const TransportChannelProxy* channel = GetChannelProxy(component);
163   if (channel == NULL) {
164     return false;
165   }
166 
167   *channel_name = channel->name();
168   return true;
169 }
170 
GetComponentFromChannelName(const std::string & channel_name,int * component) const171 bool TransportProxy::GetComponentFromChannelName(
172     const std::string& channel_name, int* component) const {
173   const TransportChannelProxy* channel = GetChannelProxyByName(channel_name);
174   if (channel == NULL) {
175     return false;
176   }
177 
178   *component = channel->component();
179   return true;
180 }
181 
GetChannelProxy(int component) const182 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
183   ChannelMap::const_iterator iter = channels_.find(component);
184   return (iter != channels_.end()) ? iter->second : NULL;
185 }
186 
GetChannelProxyByName(const std::string & name) const187 TransportChannelProxy* TransportProxy::GetChannelProxyByName(
188     const std::string& name) const {
189   for (ChannelMap::const_iterator iter = channels_.begin();
190        iter != channels_.end();
191        ++iter) {
192     if (iter->second->name() == name) {
193       return iter->second;
194     }
195   }
196   return NULL;
197 }
198 
GetOrCreateChannelProxyImpl(int component)199 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl(
200     int component) {
201   return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
202       &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component));
203 }
204 
GetOrCreateChannelProxyImpl_w(int component)205 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w(
206     int component) {
207   ASSERT(rtc::Thread::Current() == worker_thread_);
208   TransportChannelImpl* impl = transport_->get()->GetChannel(component);
209   if (impl == NULL) {
210     impl = transport_->get()->CreateChannel(component);
211   }
212   return impl;
213 }
214 
SetupChannelProxy(int component,TransportChannelProxy * transproxy)215 void TransportProxy::SetupChannelProxy(
216     int component, TransportChannelProxy* transproxy) {
217   worker_thread_->Invoke<void>(Bind(
218       &TransportProxy::SetupChannelProxy_w, this, component, transproxy));
219 }
220 
SetupChannelProxy_w(int component,TransportChannelProxy * transproxy)221 void TransportProxy::SetupChannelProxy_w(
222     int component, TransportChannelProxy* transproxy) {
223   ASSERT(rtc::Thread::Current() == worker_thread_);
224   TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component);
225   ASSERT(impl != NULL);
226   transproxy->SetImplementation(impl);
227 }
228 
ReplaceChannelProxyImpl(TransportChannelProxy * proxy,TransportChannelImpl * impl)229 void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy,
230                                              TransportChannelImpl* impl) {
231   worker_thread_->Invoke<void>(Bind(
232       &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl));
233 }
234 
ReplaceChannelProxyImpl_w(TransportChannelProxy * proxy,TransportChannelImpl * impl)235 void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy,
236                                                TransportChannelImpl* impl) {
237   ASSERT(rtc::Thread::Current() == worker_thread_);
238   ASSERT(proxy != NULL);
239   proxy->SetImplementation(impl);
240 }
241 
242 // This function muxes |this| onto |target| by repointing |this| at
243 // |target|'s transport and setting our TransportChannelProxies
244 // to point to |target|'s underlying implementations.
SetupMux(TransportProxy * target)245 bool TransportProxy::SetupMux(TransportProxy* target) {
246   // Bail out if there's nothing to do.
247   if (transport_ == target->transport_) {
248     return true;
249   }
250 
251   // Run through all channels and remove any non-rtp transport channels before
252   // setting target transport channels.
253   for (ChannelMap::const_iterator iter = channels_.begin();
254        iter != channels_.end(); ++iter) {
255     if (!target->transport_->get()->HasChannel(iter->first)) {
256       // Remove if channel doesn't exist in |transport_|.
257       ReplaceChannelProxyImpl(iter->second, NULL);
258     } else {
259       // Replace the impl for all the TransportProxyChannels with the channels
260       // from |target|'s transport. Fail if there's not an exact match.
261       ReplaceChannelProxyImpl(
262           iter->second, target->transport_->get()->CreateChannel(iter->first));
263     }
264   }
265 
266   // Now replace our transport. Must happen afterwards because
267   // it deletes all impls as a side effect.
268   transport_ = target->transport_;
269   transport_->get()->SignalCandidatesReady.connect(
270       this, &TransportProxy::OnTransportCandidatesReady);
271   set_candidates_allocated(target->candidates_allocated());
272   return true;
273 }
274 
SetIceRole(IceRole role)275 void TransportProxy::SetIceRole(IceRole role) {
276   transport_->get()->SetIceRole(role);
277 }
278 
SetLocalTransportDescription(const TransportDescription & description,ContentAction action,std::string * error_desc)279 bool TransportProxy::SetLocalTransportDescription(
280     const TransportDescription& description,
281     ContentAction action,
282     std::string* error_desc) {
283   // If this is an answer, finalize the negotiation.
284   if (action == CA_ANSWER) {
285     CompleteNegotiation();
286   }
287   bool result = transport_->get()->SetLocalTransportDescription(description,
288                                                                 action,
289                                                                 error_desc);
290   if (result)
291     local_description_set_ = true;
292   return result;
293 }
294 
SetRemoteTransportDescription(const TransportDescription & description,ContentAction action,std::string * error_desc)295 bool TransportProxy::SetRemoteTransportDescription(
296     const TransportDescription& description,
297     ContentAction action,
298     std::string* error_desc) {
299   // If this is an answer, finalize the negotiation.
300   if (action == CA_ANSWER) {
301     CompleteNegotiation();
302   }
303   bool result = transport_->get()->SetRemoteTransportDescription(description,
304                                                                  action,
305                                                                  error_desc);
306   if (result)
307     remote_description_set_ = true;
308   return result;
309 }
310 
OnSignalingReady()311 void TransportProxy::OnSignalingReady() {
312   // If we're starting a new allocation sequence, reset our state.
313   set_candidates_allocated(false);
314   transport_->get()->OnSignalingReady();
315 }
316 
OnRemoteCandidates(const Candidates & candidates,std::string * error)317 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
318                                         std::string* error) {
319   // Ensure the transport is negotiated before handling candidates.
320   // TODO(juberti): Remove this once everybody calls SetLocalTD.
321   CompleteNegotiation();
322 
323   // Verify each candidate before passing down to transport layer.
324   for (Candidates::const_iterator cand = candidates.begin();
325        cand != candidates.end(); ++cand) {
326     if (!transport_->get()->VerifyCandidate(*cand, error))
327       return false;
328     if (!HasChannel(cand->component())) {
329       *error = "Candidate has unknown component: " + cand->ToString() +
330                " for content: " + content_name_;
331       return false;
332     }
333   }
334   transport_->get()->OnRemoteCandidates(candidates);
335   return true;
336 }
337 
SetIdentity(rtc::SSLIdentity * identity)338 void TransportProxy::SetIdentity(
339     rtc::SSLIdentity* identity) {
340   transport_->get()->SetIdentity(identity);
341 }
342 
StateToString(State state)343 std::string BaseSession::StateToString(State state) {
344   switch (state) {
345     case Session::STATE_INIT:
346       return "STATE_INIT";
347     case Session::STATE_SENTINITIATE:
348       return "STATE_SENTINITIATE";
349     case Session::STATE_RECEIVEDINITIATE:
350       return "STATE_RECEIVEDINITIATE";
351     case Session::STATE_SENTPRACCEPT:
352       return "STATE_SENTPRACCEPT";
353     case Session::STATE_SENTACCEPT:
354       return "STATE_SENTACCEPT";
355     case Session::STATE_RECEIVEDPRACCEPT:
356       return "STATE_RECEIVEDPRACCEPT";
357     case Session::STATE_RECEIVEDACCEPT:
358       return "STATE_RECEIVEDACCEPT";
359     case Session::STATE_SENTMODIFY:
360       return "STATE_SENTMODIFY";
361     case Session::STATE_RECEIVEDMODIFY:
362       return "STATE_RECEIVEDMODIFY";
363     case Session::STATE_SENTREJECT:
364       return "STATE_SENTREJECT";
365     case Session::STATE_RECEIVEDREJECT:
366       return "STATE_RECEIVEDREJECT";
367     case Session::STATE_SENTREDIRECT:
368       return "STATE_SENTREDIRECT";
369     case Session::STATE_SENTTERMINATE:
370       return "STATE_SENTTERMINATE";
371     case Session::STATE_RECEIVEDTERMINATE:
372       return "STATE_RECEIVEDTERMINATE";
373     case Session::STATE_INPROGRESS:
374       return "STATE_INPROGRESS";
375     case Session::STATE_DEINIT:
376       return "STATE_DEINIT";
377     default:
378       break;
379   }
380   return "STATE_" + rtc::ToString(state);
381 }
382 
BaseSession(rtc::Thread * signaling_thread,rtc::Thread * worker_thread,PortAllocator * port_allocator,const std::string & sid,const std::string & content_type,bool initiator)383 BaseSession::BaseSession(rtc::Thread* signaling_thread,
384                          rtc::Thread* worker_thread,
385                          PortAllocator* port_allocator,
386                          const std::string& sid,
387                          const std::string& content_type,
388                          bool initiator)
389     : state_(STATE_INIT),
390       error_(ERROR_NONE),
391       signaling_thread_(signaling_thread),
392       worker_thread_(worker_thread),
393       port_allocator_(port_allocator),
394       sid_(sid),
395       content_type_(content_type),
396       transport_type_(NS_GINGLE_P2P),
397       initiator_(initiator),
398       identity_(NULL),
399       ice_tiebreaker_(rtc::CreateRandomId64()),
400       role_switch_(false) {
401   ASSERT(signaling_thread->IsCurrent());
402 }
403 
~BaseSession()404 BaseSession::~BaseSession() {
405   ASSERT(signaling_thread()->IsCurrent());
406 
407   ASSERT(state_ != STATE_DEINIT);
408   LogState(state_, STATE_DEINIT);
409   state_ = STATE_DEINIT;
410   SignalState(this, state_);
411 
412   for (TransportMap::iterator iter = transports_.begin();
413        iter != transports_.end(); ++iter) {
414     delete iter->second;
415   }
416 }
417 
local_description() const418 const SessionDescription* BaseSession::local_description() const {
419   // TODO(tommi): Assert on thread correctness.
420   return local_description_.get();
421 }
422 
remote_description() const423 const SessionDescription* BaseSession::remote_description() const {
424   // TODO(tommi): Assert on thread correctness.
425   return remote_description_.get();
426 }
427 
remote_description()428 SessionDescription* BaseSession::remote_description() {
429   // TODO(tommi): Assert on thread correctness.
430   return remote_description_.get();
431 }
432 
set_local_description(const SessionDescription * sdesc)433 void BaseSession::set_local_description(const SessionDescription* sdesc) {
434   // TODO(tommi): Assert on thread correctness.
435   if (sdesc != local_description_.get())
436     local_description_.reset(sdesc);
437 }
438 
set_remote_description(SessionDescription * sdesc)439 void BaseSession::set_remote_description(SessionDescription* sdesc) {
440   // TODO(tommi): Assert on thread correctness.
441   if (sdesc != remote_description_)
442     remote_description_.reset(sdesc);
443 }
444 
initiator_description() const445 const SessionDescription* BaseSession::initiator_description() const {
446   // TODO(tommi): Assert on thread correctness.
447   return initiator_ ? local_description_.get() : remote_description_.get();
448 }
449 
SetIdentity(rtc::SSLIdentity * identity)450 bool BaseSession::SetIdentity(rtc::SSLIdentity* identity) {
451   if (identity_)
452     return false;
453   identity_ = identity;
454   for (TransportMap::iterator iter = transports_.begin();
455        iter != transports_.end(); ++iter) {
456     iter->second->SetIdentity(identity_);
457   }
458   return true;
459 }
460 
PushdownTransportDescription(ContentSource source,ContentAction action,std::string * error_desc)461 bool BaseSession::PushdownTransportDescription(ContentSource source,
462                                                ContentAction action,
463                                                std::string* error_desc) {
464   if (source == CS_LOCAL) {
465     return PushdownLocalTransportDescription(local_description(),
466                                              action,
467                                              error_desc);
468   }
469   return PushdownRemoteTransportDescription(remote_description(),
470                                             action,
471                                             error_desc);
472 }
473 
PushdownLocalTransportDescription(const SessionDescription * sdesc,ContentAction action,std::string * error_desc)474 bool BaseSession::PushdownLocalTransportDescription(
475     const SessionDescription* sdesc,
476     ContentAction action,
477     std::string* error_desc) {
478   // Update the Transports with the right information, and trigger them to
479   // start connecting.
480   for (TransportMap::iterator iter = transports_.begin();
481        iter != transports_.end(); ++iter) {
482     // If no transport info was in this session description, ret == false
483     // and we just skip this one.
484     TransportDescription tdesc;
485     bool ret = GetTransportDescription(
486         sdesc, iter->second->content_name(), &tdesc);
487     if (ret) {
488       if (!iter->second->SetLocalTransportDescription(tdesc, action,
489                                                       error_desc)) {
490         return false;
491       }
492 
493       iter->second->ConnectChannels();
494     }
495   }
496 
497   return true;
498 }
499 
PushdownRemoteTransportDescription(const SessionDescription * sdesc,ContentAction action,std::string * error_desc)500 bool BaseSession::PushdownRemoteTransportDescription(
501     const SessionDescription* sdesc,
502     ContentAction action,
503     std::string* error_desc) {
504   // Update the Transports with the right information.
505   for (TransportMap::iterator iter = transports_.begin();
506        iter != transports_.end(); ++iter) {
507     TransportDescription tdesc;
508 
509     // If no transport info was in this session description, ret == false
510     // and we just skip this one.
511     bool ret = GetTransportDescription(
512         sdesc, iter->second->content_name(), &tdesc);
513     if (ret) {
514       if (!iter->second->SetRemoteTransportDescription(tdesc, action,
515                                                        error_desc)) {
516         return false;
517       }
518     }
519   }
520 
521   return true;
522 }
523 
CreateChannel(const std::string & content_name,const std::string & channel_name,int component)524 TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
525                                              const std::string& channel_name,
526                                              int component) {
527   // We create the proxy "on demand" here because we need to support
528   // creating channels at any time, even before we send or receive
529   // initiate messages, which is before we create the transports.
530   TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
531   return transproxy->CreateChannel(channel_name, component);
532 }
533 
GetChannel(const std::string & content_name,int component)534 TransportChannel* BaseSession::GetChannel(const std::string& content_name,
535                                           int component) {
536   TransportProxy* transproxy = GetTransportProxy(content_name);
537   if (transproxy == NULL)
538     return NULL;
539 
540   return transproxy->GetChannel(component);
541 }
542 
DestroyChannel(const std::string & content_name,int component)543 void BaseSession::DestroyChannel(const std::string& content_name,
544                                  int component) {
545   TransportProxy* transproxy = GetTransportProxy(content_name);
546   ASSERT(transproxy != NULL);
547   transproxy->DestroyChannel(component);
548 }
549 
GetOrCreateTransportProxy(const std::string & content_name)550 TransportProxy* BaseSession::GetOrCreateTransportProxy(
551     const std::string& content_name) {
552   TransportProxy* transproxy = GetTransportProxy(content_name);
553   if (transproxy)
554     return transproxy;
555 
556   Transport* transport = CreateTransport(content_name);
557   transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED);
558   transport->SetIceTiebreaker(ice_tiebreaker_);
559   // TODO: Connect all the Transport signals to TransportProxy
560   // then to the BaseSession.
561   transport->SignalConnecting.connect(
562       this, &BaseSession::OnTransportConnecting);
563   transport->SignalWritableState.connect(
564       this, &BaseSession::OnTransportWritable);
565   transport->SignalRequestSignaling.connect(
566       this, &BaseSession::OnTransportRequestSignaling);
567   transport->SignalTransportError.connect(
568       this, &BaseSession::OnTransportSendError);
569   transport->SignalRouteChange.connect(
570       this, &BaseSession::OnTransportRouteChange);
571   transport->SignalCandidatesAllocationDone.connect(
572       this, &BaseSession::OnTransportCandidatesAllocationDone);
573   transport->SignalRoleConflict.connect(
574       this, &BaseSession::OnRoleConflict);
575   transport->SignalCompleted.connect(
576       this, &BaseSession::OnTransportCompleted);
577   transport->SignalFailed.connect(
578       this, &BaseSession::OnTransportFailed);
579 
580   transproxy = new TransportProxy(worker_thread_, sid_, content_name,
581                                   new TransportWrapper(transport));
582   transproxy->SignalCandidatesReady.connect(
583       this, &BaseSession::OnTransportProxyCandidatesReady);
584   if (identity_)
585     transproxy->SetIdentity(identity_);
586   transports_[content_name] = transproxy;
587 
588   return transproxy;
589 }
590 
GetTransport(const std::string & content_name)591 Transport* BaseSession::GetTransport(const std::string& content_name) {
592   TransportProxy* transproxy = GetTransportProxy(content_name);
593   if (transproxy == NULL)
594     return NULL;
595   return transproxy->impl();
596 }
597 
GetTransportProxy(const std::string & content_name)598 TransportProxy* BaseSession::GetTransportProxy(
599     const std::string& content_name) {
600   TransportMap::iterator iter = transports_.find(content_name);
601   return (iter != transports_.end()) ? iter->second : NULL;
602 }
603 
GetTransportProxy(const Transport * transport)604 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) {
605   for (TransportMap::iterator iter = transports_.begin();
606        iter != transports_.end(); ++iter) {
607     TransportProxy* transproxy = iter->second;
608     if (transproxy->impl() == transport) {
609       return transproxy;
610     }
611   }
612   return NULL;
613 }
614 
GetFirstTransportProxy()615 TransportProxy* BaseSession::GetFirstTransportProxy() {
616   if (transports_.empty())
617     return NULL;
618   return transports_.begin()->second;
619 }
620 
DestroyTransportProxy(const std::string & content_name)621 void BaseSession::DestroyTransportProxy(
622     const std::string& content_name) {
623   TransportMap::iterator iter = transports_.find(content_name);
624   if (iter != transports_.end()) {
625     delete iter->second;
626     transports_.erase(content_name);
627   }
628 }
629 
CreateTransport(const std::string & content_name)630 cricket::Transport* BaseSession::CreateTransport(
631     const std::string& content_name) {
632   ASSERT(transport_type_ == NS_GINGLE_P2P);
633   return new cricket::DtlsTransport<P2PTransport>(
634       signaling_thread(), worker_thread(), content_name,
635       port_allocator(), identity_);
636 }
637 
GetStats(SessionStats * stats)638 bool BaseSession::GetStats(SessionStats* stats) {
639   for (TransportMap::iterator iter = transports_.begin();
640        iter != transports_.end(); ++iter) {
641     std::string proxy_id = iter->second->content_name();
642     // We are ignoring not-yet-instantiated transports.
643     if (iter->second->impl()) {
644       std::string transport_id = iter->second->impl()->content_name();
645       stats->proxy_to_transport[proxy_id] = transport_id;
646       if (stats->transport_stats.find(transport_id)
647           == stats->transport_stats.end()) {
648         TransportStats subinfos;
649         if (!iter->second->impl()->GetStats(&subinfos)) {
650           return false;
651         }
652         stats->transport_stats[transport_id] = subinfos;
653       }
654     }
655   }
656   return true;
657 }
658 
SetState(State state)659 void BaseSession::SetState(State state) {
660   ASSERT(signaling_thread_->IsCurrent());
661   if (state != state_) {
662     LogState(state_, state);
663     state_ = state;
664     SignalState(this, state_);
665     signaling_thread_->Post(this, MSG_STATE);
666   }
667   SignalNewDescription();
668 }
669 
SetError(Error error,const std::string & error_desc)670 void BaseSession::SetError(Error error, const std::string& error_desc) {
671   ASSERT(signaling_thread_->IsCurrent());
672   if (error != error_) {
673     error_ = error;
674     error_desc_ = error_desc;
675     SignalError(this, error);
676   }
677 }
678 
OnSignalingReady()679 void BaseSession::OnSignalingReady() {
680   ASSERT(signaling_thread()->IsCurrent());
681   for (TransportMap::iterator iter = transports_.begin();
682        iter != transports_.end(); ++iter) {
683     iter->second->OnSignalingReady();
684   }
685 }
686 
687 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to
688 // start, remove this method once everyone calls PushdownLocalTD.
SpeculativelyConnectAllTransportChannels()689 void BaseSession::SpeculativelyConnectAllTransportChannels() {
690   // Put all transports into the connecting state.
691   for (TransportMap::iterator iter = transports_.begin();
692        iter != transports_.end(); ++iter) {
693     iter->second->ConnectChannels();
694   }
695 }
696 
OnRemoteCandidates(const std::string & content_name,const Candidates & candidates,std::string * error)697 bool BaseSession::OnRemoteCandidates(const std::string& content_name,
698                                      const Candidates& candidates,
699                                      std::string* error) {
700   // Give candidates to the appropriate transport, and tell that transport
701   // to start connecting, if it's not already doing so.
702   TransportProxy* transproxy = GetTransportProxy(content_name);
703   if (!transproxy) {
704     *error = "Unknown content name " + content_name;
705     return false;
706   }
707   if (!transproxy->OnRemoteCandidates(candidates, error)) {
708     return false;
709   }
710   // TODO(juberti): Remove this call once we can be sure that we always have
711   // a local transport description (which will trigger the connection).
712   transproxy->ConnectChannels();
713   return true;
714 }
715 
MaybeEnableMuxingSupport()716 bool BaseSession::MaybeEnableMuxingSupport() {
717   // We need both a local and remote description to decide if we should mux.
718   if ((state_ == STATE_SENTINITIATE ||
719       state_ == STATE_RECEIVEDINITIATE) &&
720       ((local_description_ == NULL) ||
721       (remote_description_ == NULL))) {
722     return false;
723   }
724 
725   // In order to perform the multiplexing, we need all proxies to be in the
726   // negotiated state, i.e. to have implementations underneath.
727   // Ensure that this is the case, regardless of whether we are going to mux.
728   for (TransportMap::iterator iter = transports_.begin();
729        iter != transports_.end(); ++iter) {
730     ASSERT(iter->second->negotiated());
731     if (!iter->second->negotiated())
732       return false;
733   }
734 
735   // If both sides agree to BUNDLE, mux all the specified contents onto the
736   // transport belonging to the first content name in the BUNDLE group.
737   // If the contents are already muxed, this will be a no-op.
738   // TODO(juberti): Should this check that local and remote have configured
739   // BUNDLE the same way?
740   bool candidates_allocated = IsCandidateAllocationDone();
741   const ContentGroup* local_bundle_group =
742       local_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
743   const ContentGroup* remote_bundle_group =
744       remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
745   if (local_bundle_group && remote_bundle_group &&
746       local_bundle_group->FirstContentName()) {
747     const std::string* content_name = local_bundle_group->FirstContentName();
748     const ContentInfo* content =
749         local_description_->GetContentByName(*content_name);
750     ASSERT(content != NULL);
751     if (!SetSelectedProxy(content->name, local_bundle_group)) {
752       LOG(LS_WARNING) << "Failed to set up BUNDLE";
753       return false;
754     }
755 
756     // If we weren't done gathering before, we might be done now, as a result
757     // of enabling mux.
758     LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: "
759                  << *content_name;
760     if (!candidates_allocated) {
761       MaybeCandidateAllocationDone();
762     }
763   } else {
764     LOG(LS_INFO) << "No BUNDLE information, not bundling.";
765   }
766   return true;
767 }
768 
SetSelectedProxy(const std::string & content_name,const ContentGroup * muxed_group)769 bool BaseSession::SetSelectedProxy(const std::string& content_name,
770                                    const ContentGroup* muxed_group) {
771   TransportProxy* selected_proxy = GetTransportProxy(content_name);
772   if (!selected_proxy) {
773     return false;
774   }
775 
776   ASSERT(selected_proxy->negotiated());
777   for (TransportMap::iterator iter = transports_.begin();
778        iter != transports_.end(); ++iter) {
779     // If content is part of the mux group, then repoint its proxy at the
780     // transport object that we have chosen to mux onto. If the proxy
781     // is already pointing at the right object, it will be a no-op.
782     if (muxed_group->HasContentName(iter->first) &&
783         !iter->second->SetupMux(selected_proxy)) {
784       return false;
785     }
786   }
787   return true;
788 }
789 
OnTransportCandidatesAllocationDone(Transport * transport)790 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
791   // TODO(juberti): This is a clunky way of processing the done signal. Instead,
792   // TransportProxy should receive the done signal directly, set its allocated
793   // flag internally, and then reissue the done signal to Session.
794   // Overall we should make TransportProxy receive *all* the signals from
795   // Transport, since this removes the need to manually iterate over all
796   // the transports, as is needed to make sure signals are handled properly
797   // when BUNDLEing.
798   // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways
799   // that make it prohibitively difficult to run dbg builds. Disabled for now.
800   //ASSERT(!IsCandidateAllocationDone());
801   for (TransportMap::iterator iter = transports_.begin();
802        iter != transports_.end(); ++iter) {
803     if (iter->second->impl() == transport) {
804       iter->second->set_candidates_allocated(true);
805     }
806   }
807   MaybeCandidateAllocationDone();
808 }
809 
IsCandidateAllocationDone() const810 bool BaseSession::IsCandidateAllocationDone() const {
811   for (TransportMap::const_iterator iter = transports_.begin();
812        iter != transports_.end(); ++iter) {
813     if (!iter->second->candidates_allocated())
814       return false;
815   }
816   return true;
817 }
818 
MaybeCandidateAllocationDone()819 void BaseSession::MaybeCandidateAllocationDone() {
820   if (IsCandidateAllocationDone()) {
821     LOG(LS_INFO) << "Candidate gathering is complete.";
822     OnCandidatesAllocationDone();
823   }
824 }
825 
OnRoleConflict()826 void BaseSession::OnRoleConflict() {
827   if (role_switch_) {
828     LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
829     return;
830   }
831 
832   role_switch_ = true;
833   for (TransportMap::iterator iter = transports_.begin();
834        iter != transports_.end(); ++iter) {
835     // Role will be reverse of initial role setting.
836     IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING;
837     iter->second->SetIceRole(role);
838   }
839 }
840 
LogState(State old_state,State new_state)841 void BaseSession::LogState(State old_state, State new_state) {
842   LOG(LS_INFO) << "Session:" << id()
843                << " Old state:" << StateToString(old_state)
844                << " New state:" << StateToString(new_state)
845                << " Type:" << content_type()
846                << " Transport:" << transport_type();
847 }
848 
849 // static
GetTransportDescription(const SessionDescription * description,const std::string & content_name,TransportDescription * tdesc)850 bool BaseSession::GetTransportDescription(const SessionDescription* description,
851                                           const std::string& content_name,
852                                           TransportDescription* tdesc) {
853   if (!description || !tdesc) {
854     return false;
855   }
856   const TransportInfo* transport_info =
857       description->GetTransportInfoByName(content_name);
858   if (!transport_info) {
859     return false;
860   }
861   *tdesc = transport_info->description;
862   return true;
863 }
864 
SignalNewDescription()865 void BaseSession::SignalNewDescription() {
866   ContentAction action;
867   ContentSource source;
868   if (!GetContentAction(&action, &source)) {
869     return;
870   }
871   if (source == CS_LOCAL) {
872     SignalNewLocalDescription(this, action);
873   } else {
874     SignalNewRemoteDescription(this, action);
875   }
876 }
877 
GetContentAction(ContentAction * action,ContentSource * source)878 bool BaseSession::GetContentAction(ContentAction* action,
879                                    ContentSource* source) {
880   switch (state_) {
881     // new local description
882     case STATE_SENTINITIATE:
883       *action = CA_OFFER;
884       *source = CS_LOCAL;
885       break;
886     case STATE_SENTPRACCEPT:
887       *action = CA_PRANSWER;
888       *source = CS_LOCAL;
889       break;
890     case STATE_SENTACCEPT:
891       *action = CA_ANSWER;
892       *source = CS_LOCAL;
893       break;
894     // new remote description
895     case STATE_RECEIVEDINITIATE:
896       *action = CA_OFFER;
897       *source = CS_REMOTE;
898       break;
899     case STATE_RECEIVEDPRACCEPT:
900       *action = CA_PRANSWER;
901       *source = CS_REMOTE;
902       break;
903     case STATE_RECEIVEDACCEPT:
904       *action = CA_ANSWER;
905       *source = CS_REMOTE;
906       break;
907     default:
908       return false;
909   }
910   return true;
911 }
912 
OnMessage(rtc::Message * pmsg)913 void BaseSession::OnMessage(rtc::Message *pmsg) {
914   switch (pmsg->message_id) {
915   case MSG_TIMEOUT:
916     // Session timeout has occured.
917     SetError(ERROR_TIME, "Session timeout has occured.");
918     break;
919 
920   case MSG_STATE:
921     switch (state_) {
922     case STATE_SENTACCEPT:
923     case STATE_RECEIVEDACCEPT:
924       SetState(STATE_INPROGRESS);
925       break;
926 
927     default:
928       // Explicitly ignoring some states here.
929       break;
930     }
931     break;
932   }
933 }
934 
Session(SessionManager * session_manager,const std::string & local_name,const std::string & initiator_name,const std::string & sid,const std::string & content_type,SessionClient * client)935 Session::Session(SessionManager* session_manager,
936                  const std::string& local_name,
937                  const std::string& initiator_name,
938                  const std::string& sid,
939                  const std::string& content_type,
940                  SessionClient* client)
941     : BaseSession(session_manager->signaling_thread(),
942                   session_manager->worker_thread(),
943                   session_manager->port_allocator(),
944                   sid, content_type, initiator_name == local_name) {
945   ASSERT(client != NULL);
946   session_manager_ = session_manager;
947   local_name_ = local_name;
948   initiator_name_ = initiator_name;
949   transport_parser_ = new P2PTransportParser();
950   client_ = client;
951   initiate_acked_ = false;
952   current_protocol_ = PROTOCOL_HYBRID;
953 }
954 
~Session()955 Session::~Session() {
956   delete transport_parser_;
957 }
958 
Initiate(const std::string & to,const SessionDescription * sdesc)959 bool Session::Initiate(const std::string& to,
960                        const SessionDescription* sdesc) {
961   ASSERT(signaling_thread()->IsCurrent());
962   SessionError error;
963 
964   // Only from STATE_INIT
965   if (state() != STATE_INIT)
966     return false;
967 
968   // Setup for signaling.
969   set_remote_name(to);
970   set_local_description(sdesc);
971   if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()),
972                               &error)) {
973     LOG(LS_ERROR) << "Could not create transports: " << error.text;
974     return false;
975   }
976 
977   if (!SendInitiateMessage(sdesc, &error)) {
978     LOG(LS_ERROR) << "Could not send initiate message: " << error.text;
979     return false;
980   }
981 
982   // We need to connect transport proxy and impl here so that we can process
983   // the TransportDescriptions.
984   SpeculativelyConnectAllTransportChannels();
985 
986   PushdownTransportDescription(CS_LOCAL, CA_OFFER, NULL);
987   SetState(Session::STATE_SENTINITIATE);
988   return true;
989 }
990 
Accept(const SessionDescription * sdesc)991 bool Session::Accept(const SessionDescription* sdesc) {
992   ASSERT(signaling_thread()->IsCurrent());
993 
994   // Only if just received initiate
995   if (state() != STATE_RECEIVEDINITIATE)
996     return false;
997 
998   // Setup for signaling.
999   set_local_description(sdesc);
1000 
1001   SessionError error;
1002   if (!SendAcceptMessage(sdesc, &error)) {
1003     LOG(LS_ERROR) << "Could not send accept message: " << error.text;
1004     return false;
1005   }
1006   // TODO(juberti): Add BUNDLE support to transport-info messages.
1007   PushdownTransportDescription(CS_LOCAL, CA_ANSWER, NULL);
1008   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
1009   SetState(Session::STATE_SENTACCEPT);
1010   return true;
1011 }
1012 
Reject(const std::string & reason)1013 bool Session::Reject(const std::string& reason) {
1014   ASSERT(signaling_thread()->IsCurrent());
1015 
1016   // Reject is sent in response to an initiate or modify, to reject the
1017   // request
1018   if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY)
1019     return false;
1020 
1021   SessionError error;
1022   if (!SendRejectMessage(reason, &error)) {
1023     LOG(LS_ERROR) << "Could not send reject message: " << error.text;
1024     return false;
1025   }
1026 
1027   SetState(STATE_SENTREJECT);
1028   return true;
1029 }
1030 
TerminateWithReason(const std::string & reason)1031 bool Session::TerminateWithReason(const std::string& reason) {
1032   ASSERT(signaling_thread()->IsCurrent());
1033 
1034   // Either side can terminate, at any time.
1035   switch (state()) {
1036     case STATE_SENTTERMINATE:
1037     case STATE_RECEIVEDTERMINATE:
1038       return false;
1039 
1040     case STATE_SENTREJECT:
1041     case STATE_RECEIVEDREJECT:
1042       // We don't need to send terminate if we sent or received a reject...
1043       // it's implicit.
1044       break;
1045 
1046     default:
1047       SessionError error;
1048       if (!SendTerminateMessage(reason, &error)) {
1049         LOG(LS_ERROR) << "Could not send terminate message: " << error.text;
1050         return false;
1051       }
1052       break;
1053   }
1054 
1055   SetState(STATE_SENTTERMINATE);
1056   return true;
1057 }
1058 
SendInfoMessage(const XmlElements & elems,const std::string & remote_name)1059 bool Session::SendInfoMessage(const XmlElements& elems,
1060                               const std::string& remote_name) {
1061   ASSERT(signaling_thread()->IsCurrent());
1062   SessionError error;
1063   if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) {
1064     LOG(LS_ERROR) << "Could not send info message " << error.text;
1065     return false;
1066   }
1067   return true;
1068 }
1069 
SendDescriptionInfoMessage(const ContentInfos & contents)1070 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) {
1071   XmlElements elems;
1072   WriteError write_error;
1073   if (!WriteDescriptionInfo(current_protocol_,
1074                             contents,
1075                             GetContentParsers(),
1076                             &elems, &write_error)) {
1077     LOG(LS_ERROR) << "Could not write description info message: "
1078                   << write_error.text;
1079     return false;
1080   }
1081   SessionError error;
1082   if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) {
1083     LOG(LS_ERROR) << "Could not send description info message: "
1084                   << error.text;
1085     return false;
1086   }
1087   return true;
1088 }
1089 
GetEmptyTransportInfos(const ContentInfos & contents) const1090 TransportInfos Session::GetEmptyTransportInfos(
1091     const ContentInfos& contents) const {
1092   TransportInfos tinfos;
1093   for (ContentInfos::const_iterator content = contents.begin();
1094        content != contents.end(); ++content) {
1095     tinfos.push_back(TransportInfo(content->name,
1096                                    TransportDescription(transport_type(),
1097                                                         std::string(),
1098                                                         std::string())));
1099   }
1100   return tinfos;
1101 }
1102 
OnRemoteCandidates(const TransportInfos & tinfos,ParseError * error)1103 bool Session::OnRemoteCandidates(
1104     const TransportInfos& tinfos, ParseError* error) {
1105   for (TransportInfos::const_iterator tinfo = tinfos.begin();
1106        tinfo != tinfos.end(); ++tinfo) {
1107     std::string str_error;
1108     if (!BaseSession::OnRemoteCandidates(
1109         tinfo->content_name, tinfo->description.candidates, &str_error)) {
1110       return BadParse(str_error, error);
1111     }
1112   }
1113   return true;
1114 }
1115 
CreateTransportProxies(const TransportInfos & tinfos,SessionError * error)1116 bool Session::CreateTransportProxies(const TransportInfos& tinfos,
1117                                      SessionError* error) {
1118   for (TransportInfos::const_iterator tinfo = tinfos.begin();
1119        tinfo != tinfos.end(); ++tinfo) {
1120     if (tinfo->description.transport_type != transport_type()) {
1121       error->SetText("No supported transport in offer.");
1122       return false;
1123     }
1124 
1125     GetOrCreateTransportProxy(tinfo->content_name);
1126   }
1127   return true;
1128 }
1129 
GetTransportParsers()1130 TransportParserMap Session::GetTransportParsers() {
1131   TransportParserMap parsers;
1132   parsers[transport_type()] = transport_parser_;
1133   return parsers;
1134 }
1135 
GetCandidateTranslators()1136 CandidateTranslatorMap Session::GetCandidateTranslators() {
1137   CandidateTranslatorMap translators;
1138   // NOTE: This technique makes it impossible to parse G-ICE
1139   // candidates in session-initiate messages because the channels
1140   // aren't yet created at that point.  Since we don't use candidates
1141   // in session-initiate messages, we should be OK.  Once we switch to
1142   // ICE, this translation shouldn't be necessary.
1143   for (TransportMap::const_iterator iter = transport_proxies().begin();
1144        iter != transport_proxies().end(); ++iter) {
1145     translators[iter->first] = iter->second;
1146   }
1147   return translators;
1148 }
1149 
GetContentParsers()1150 ContentParserMap Session::GetContentParsers() {
1151   ContentParserMap parsers;
1152   parsers[content_type()] = client_;
1153   // We need to be able parse both RTP-based and SCTP-based Jingle
1154   // with the same client.
1155   if (content_type() == NS_JINGLE_RTP) {
1156     parsers[NS_JINGLE_DRAFT_SCTP] = client_;
1157   }
1158   return parsers;
1159 }
1160 
OnTransportRequestSignaling(Transport * transport)1161 void Session::OnTransportRequestSignaling(Transport* transport) {
1162   ASSERT(signaling_thread()->IsCurrent());
1163   TransportProxy* transproxy = GetTransportProxy(transport);
1164   ASSERT(transproxy != NULL);
1165   if (transproxy) {
1166     // Reset candidate allocation status for the transport proxy.
1167     transproxy->set_candidates_allocated(false);
1168   }
1169   SignalRequestSignaling(this);
1170 }
1171 
OnTransportConnecting(Transport * transport)1172 void Session::OnTransportConnecting(Transport* transport) {
1173   // This is an indication that we should begin watching the writability
1174   // state of the transport.
1175   OnTransportWritable(transport);
1176 }
1177 
OnTransportWritable(Transport * transport)1178 void Session::OnTransportWritable(Transport* transport) {
1179   ASSERT(signaling_thread()->IsCurrent());
1180 
1181   // If the transport is not writable, start a timer to make sure that it
1182   // becomes writable within a reasonable amount of time.  If it does not, we
1183   // terminate since we can't actually send data.  If the transport is writable,
1184   // cancel the timer.  Note that writability transitions may occur repeatedly
1185   // during the lifetime of the session.
1186   signaling_thread()->Clear(this, MSG_TIMEOUT);
1187   if (transport->HasChannels() && !transport->writable()) {
1188     signaling_thread()->PostDelayed(
1189         session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT);
1190   }
1191 }
1192 
OnTransportProxyCandidatesReady(TransportProxy * transproxy,const Candidates & candidates)1193 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy,
1194                                               const Candidates& candidates) {
1195   ASSERT(signaling_thread()->IsCurrent());
1196   if (transproxy != NULL) {
1197     if (initiator() && !initiate_acked_) {
1198       // TODO: This is to work around server re-ordering
1199       // messages.  We send the candidates once the session-initiate
1200       // is acked.  Once we have fixed the server to guarantee message
1201       // order, we can remove this case.
1202       transproxy->AddUnsentCandidates(candidates);
1203     } else {
1204       if (!transproxy->negotiated()) {
1205         transproxy->AddSentCandidates(candidates);
1206       }
1207       SessionError error;
1208       if (!SendTransportInfoMessage(transproxy, candidates, &error)) {
1209         LOG(LS_ERROR) << "Could not send transport info message: "
1210                       << error.text;
1211         return;
1212       }
1213     }
1214   }
1215 }
1216 
OnTransportSendError(Transport * transport,const buzz::XmlElement * stanza,const buzz::QName & name,const std::string & type,const std::string & text,const buzz::XmlElement * extra_info)1217 void Session::OnTransportSendError(Transport* transport,
1218                                    const buzz::XmlElement* stanza,
1219                                    const buzz::QName& name,
1220                                    const std::string& type,
1221                                    const std::string& text,
1222                                    const buzz::XmlElement* extra_info) {
1223   ASSERT(signaling_thread()->IsCurrent());
1224   SignalErrorMessage(this, stanza, name, type, text, extra_info);
1225 }
1226 
OnIncomingMessage(const SessionMessage & msg)1227 void Session::OnIncomingMessage(const SessionMessage& msg) {
1228   ASSERT(signaling_thread()->IsCurrent());
1229   ASSERT(state() == STATE_INIT || msg.from == remote_name());
1230 
1231   if (current_protocol_== PROTOCOL_HYBRID) {
1232     if (msg.protocol == PROTOCOL_GINGLE) {
1233       current_protocol_ = PROTOCOL_GINGLE;
1234     } else {
1235       current_protocol_ = PROTOCOL_JINGLE;
1236     }
1237   }
1238 
1239   bool valid = false;
1240   MessageError error;
1241   switch (msg.type) {
1242     case ACTION_SESSION_INITIATE:
1243       valid = OnInitiateMessage(msg, &error);
1244       break;
1245     case ACTION_SESSION_INFO:
1246       valid = OnInfoMessage(msg);
1247       break;
1248     case ACTION_SESSION_ACCEPT:
1249       valid = OnAcceptMessage(msg, &error);
1250       break;
1251     case ACTION_SESSION_REJECT:
1252       valid = OnRejectMessage(msg, &error);
1253       break;
1254     case ACTION_SESSION_TERMINATE:
1255       valid = OnTerminateMessage(msg, &error);
1256       break;
1257     case ACTION_TRANSPORT_INFO:
1258       valid = OnTransportInfoMessage(msg, &error);
1259       break;
1260     case ACTION_TRANSPORT_ACCEPT:
1261       valid = OnTransportAcceptMessage(msg, &error);
1262       break;
1263     case ACTION_DESCRIPTION_INFO:
1264       valid = OnDescriptionInfoMessage(msg, &error);
1265       break;
1266     default:
1267       valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST,
1268                          "unknown session message type",
1269                          &error);
1270   }
1271 
1272   if (valid) {
1273     SendAcknowledgementMessage(msg.stanza);
1274   } else {
1275     SignalErrorMessage(this, msg.stanza, error.type,
1276                        "modify", error.text, NULL);
1277   }
1278 }
1279 
OnIncomingResponse(const buzz::XmlElement * orig_stanza,const buzz::XmlElement * response_stanza,const SessionMessage & msg)1280 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza,
1281                                  const buzz::XmlElement* response_stanza,
1282                                  const SessionMessage& msg) {
1283   ASSERT(signaling_thread()->IsCurrent());
1284 
1285   if (msg.type == ACTION_SESSION_INITIATE) {
1286     OnInitiateAcked();
1287   }
1288 }
1289 
OnInitiateAcked()1290 void Session::OnInitiateAcked() {
1291   // TODO: This is to work around server re-ordering
1292   // messages.  We send the candidates once the session-initiate
1293   // is acked.  Once we have fixed the server to guarantee message
1294   // order, we can remove this case.
1295   if (!initiate_acked_) {
1296     initiate_acked_ = true;
1297     SessionError error;
1298     SendAllUnsentTransportInfoMessages(&error);
1299   }
1300 }
1301 
OnFailedSend(const buzz::XmlElement * orig_stanza,const buzz::XmlElement * error_stanza)1302 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza,
1303                            const buzz::XmlElement* error_stanza) {
1304   ASSERT(signaling_thread()->IsCurrent());
1305 
1306   SessionMessage msg;
1307   ParseError parse_error;
1308   if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) {
1309     LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text
1310                   << ":" << orig_stanza;
1311     return;
1312   }
1313 
1314   // If the error is a session redirect, call OnRedirectError, which will
1315   // continue the session with a new remote JID.
1316   SessionRedirect redirect;
1317   if (FindSessionRedirect(error_stanza, &redirect)) {
1318     SessionError error;
1319     if (!OnRedirectError(redirect, &error)) {
1320       // TODO: Should we send a message back?  The standard
1321       // says nothing about it.
1322       std::ostringstream desc;
1323       desc << "Failed to redirect: " << error.text;
1324       LOG(LS_ERROR) << desc.str();
1325       SetError(ERROR_RESPONSE, desc.str());
1326     }
1327     return;
1328   }
1329 
1330   std::string error_type = "cancel";
1331 
1332   const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR);
1333   if (error) {
1334     error_type = error->Attr(buzz::QN_TYPE);
1335 
1336     LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n"
1337                   << "in response to:\n" << orig_stanza->Str();
1338   } else {
1339     // don't crash if <error> is missing
1340     LOG(LS_ERROR) << "Session error without <error/> element, ignoring";
1341     return;
1342   }
1343 
1344   if (msg.type == ACTION_TRANSPORT_INFO) {
1345     // Transport messages frequently generate errors because they are sent right
1346     // when we detect a network failure.  For that reason, we ignore such
1347     // errors, because if we do not establish writability again, we will
1348     // terminate anyway.  The exceptions are transport-specific error tags,
1349     // which we pass on to the respective transport.
1350   } else if ((error_type != "continue") && (error_type != "wait")) {
1351     // We do not set an error if the other side said it is okay to continue
1352     // (possibly after waiting).  These errors can be ignored.
1353     SetError(ERROR_RESPONSE, "");
1354   }
1355 }
1356 
OnInitiateMessage(const SessionMessage & msg,MessageError * error)1357 bool Session::OnInitiateMessage(const SessionMessage& msg,
1358                                 MessageError* error) {
1359   if (!CheckState(STATE_INIT, error))
1360     return false;
1361 
1362   SessionInitiate init;
1363   if (!ParseSessionInitiate(msg.protocol, msg.action_elem,
1364                             GetContentParsers(), GetTransportParsers(),
1365                             GetCandidateTranslators(),
1366                             &init, error))
1367     return false;
1368 
1369   SessionError session_error;
1370   if (!CreateTransportProxies(init.transports, &session_error)) {
1371     return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE,
1372                       session_error.text, error);
1373   }
1374 
1375   set_remote_name(msg.from);
1376   set_initiator_name(msg.initiator);
1377   set_remote_description(new SessionDescription(init.ClearContents(),
1378                                                 init.transports,
1379                                                 init.groups));
1380   // Updating transport with TransportDescription.
1381   PushdownTransportDescription(CS_REMOTE, CA_OFFER, NULL);
1382   SetState(STATE_RECEIVEDINITIATE);
1383 
1384   // Users of Session may listen to state change and call Reject().
1385   if (state() != STATE_SENTREJECT) {
1386     if (!OnRemoteCandidates(init.transports, error))
1387       return false;
1388 
1389     // TODO(juberti): Auto-generate and push down the local transport answer.
1390     // This is necessary for trickling to work with RFC 5245 ICE.
1391   }
1392   return true;
1393 }
1394 
OnAcceptMessage(const SessionMessage & msg,MessageError * error)1395 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) {
1396   if (!CheckState(STATE_SENTINITIATE, error))
1397     return false;
1398 
1399   SessionAccept accept;
1400   if (!ParseSessionAccept(msg.protocol, msg.action_elem,
1401                           GetContentParsers(), GetTransportParsers(),
1402                           GetCandidateTranslators(),
1403                           &accept, error)) {
1404     return false;
1405   }
1406 
1407   // If we get an accept, we can assume the initiate has been
1408   // received, even if we haven't gotten an IQ response.
1409   OnInitiateAcked();
1410 
1411   set_remote_description(new SessionDescription(accept.ClearContents(),
1412                                                 accept.transports,
1413                                                 accept.groups));
1414   // Updating transport with TransportDescription.
1415   PushdownTransportDescription(CS_REMOTE, CA_ANSWER, NULL);
1416   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
1417   SetState(STATE_RECEIVEDACCEPT);
1418 
1419   if (!OnRemoteCandidates(accept.transports, error))
1420     return false;
1421 
1422   return true;
1423 }
1424 
OnRejectMessage(const SessionMessage & msg,MessageError * error)1425 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) {
1426   if (!CheckState(STATE_SENTINITIATE, error))
1427     return false;
1428 
1429   SetState(STATE_RECEIVEDREJECT);
1430   return true;
1431 }
1432 
OnInfoMessage(const SessionMessage & msg)1433 bool Session::OnInfoMessage(const SessionMessage& msg) {
1434   SignalInfoMessage(this, msg.action_elem);
1435   return true;
1436 }
1437 
OnTerminateMessage(const SessionMessage & msg,MessageError * error)1438 bool Session::OnTerminateMessage(const SessionMessage& msg,
1439                                  MessageError* error) {
1440   SessionTerminate term;
1441   if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error))
1442     return false;
1443 
1444   SignalReceivedTerminateReason(this, term.reason);
1445   if (term.debug_reason != buzz::STR_EMPTY) {
1446     LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason;
1447   }
1448 
1449   SetState(STATE_RECEIVEDTERMINATE);
1450   return true;
1451 }
1452 
OnTransportInfoMessage(const SessionMessage & msg,MessageError * error)1453 bool Session::OnTransportInfoMessage(const SessionMessage& msg,
1454                                      MessageError* error) {
1455   TransportInfos tinfos;
1456   if (!ParseTransportInfos(msg.protocol, msg.action_elem,
1457                            initiator_description()->contents(),
1458                            GetTransportParsers(), GetCandidateTranslators(),
1459                            &tinfos, error))
1460     return false;
1461 
1462   if (!OnRemoteCandidates(tinfos, error))
1463     return false;
1464 
1465   return true;
1466 }
1467 
OnTransportAcceptMessage(const SessionMessage & msg,MessageError * error)1468 bool Session::OnTransportAcceptMessage(const SessionMessage& msg,
1469                                        MessageError* error) {
1470   // TODO: Currently here only for compatibility with
1471   // Gingle 1.1 clients (notably, Google Voice).
1472   return true;
1473 }
1474 
OnDescriptionInfoMessage(const SessionMessage & msg,MessageError * error)1475 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg,
1476                               MessageError* error) {
1477   if (!CheckState(STATE_INPROGRESS, error))
1478     return false;
1479 
1480   DescriptionInfo description_info;
1481   if (!ParseDescriptionInfo(msg.protocol, msg.action_elem,
1482                             GetContentParsers(), GetTransportParsers(),
1483                             GetCandidateTranslators(),
1484                             &description_info, error)) {
1485     return false;
1486   }
1487 
1488   ContentInfos& updated_contents = description_info.contents;
1489 
1490   // TODO: Currently, reflector sends back
1491   // video stream updates even for an audio-only call, which causes
1492   // this to fail.  Put this back once reflector is fixed.
1493   //
1494   // ContentInfos::iterator it;
1495   // First, ensure all updates are valid before modifying remote_description_.
1496   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1497   //   if (remote_description()->GetContentByName(it->name) == NULL) {
1498   //     return false;
1499   //   }
1500   // }
1501 
1502   // TODO: We used to replace contents from an update, but
1503   // that no longer works with partial updates.  We need to figure out
1504   // a way to merge patial updates into contents.  For now, users of
1505   // Session should listen to SignalRemoteDescriptionUpdate and handle
1506   // updates.  They should not expect remote_description to be the
1507   // latest value.
1508   //
1509   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1510   //     remote_description()->RemoveContentByName(it->name);
1511   //     remote_description()->AddContent(it->name, it->type, it->description);
1512   //   }
1513   // }
1514 
1515   SignalRemoteDescriptionUpdate(this, updated_contents);
1516   return true;
1517 }
1518 
BareJidsEqual(const std::string & name1,const std::string & name2)1519 bool BareJidsEqual(const std::string& name1,
1520                    const std::string& name2) {
1521   buzz::Jid jid1(name1);
1522   buzz::Jid jid2(name2);
1523 
1524   return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2);
1525 }
1526 
OnRedirectError(const SessionRedirect & redirect,SessionError * error)1527 bool Session::OnRedirectError(const SessionRedirect& redirect,
1528                               SessionError* error) {
1529   MessageError message_error;
1530   if (!CheckState(STATE_SENTINITIATE, &message_error)) {
1531     return BadWrite(message_error.text, error);
1532   }
1533 
1534   if (!BareJidsEqual(remote_name(), redirect.target))
1535     return BadWrite("Redirection not allowed: must be the same bare jid.",
1536                     error);
1537 
1538   // When we receive a redirect, we point the session at the new JID
1539   // and resend the candidates.
1540   set_remote_name(redirect.target);
1541   return (SendInitiateMessage(local_description(), error) &&
1542           ResendAllTransportInfoMessages(error));
1543 }
1544 
CheckState(State expected,MessageError * error)1545 bool Session::CheckState(State expected, MessageError* error) {
1546   if (state() != expected) {
1547     // The server can deliver messages out of order/repeated for various
1548     // reasons. For example, if the server does not recive our iq response,
1549     // it could assume that the iq it sent was lost, and will then send
1550     // it again. Ideally, we should implement reliable messaging with
1551     // duplicate elimination.
1552     return BadMessage(buzz::QN_STANZA_NOT_ALLOWED,
1553                       "message not allowed in current state",
1554                       error);
1555   }
1556   return true;
1557 }
1558 
SetError(Error error,const std::string & error_desc)1559 void Session::SetError(Error error, const std::string& error_desc) {
1560   BaseSession::SetError(error, error_desc);
1561   if (error != ERROR_NONE)
1562     signaling_thread()->Post(this, MSG_ERROR);
1563 }
1564 
OnMessage(rtc::Message * pmsg)1565 void Session::OnMessage(rtc::Message* pmsg) {
1566   // preserve this because BaseSession::OnMessage may modify it
1567   State orig_state = state();
1568 
1569   BaseSession::OnMessage(pmsg);
1570 
1571   switch (pmsg->message_id) {
1572   case MSG_ERROR:
1573     TerminateWithReason(STR_TERMINATE_ERROR);
1574     break;
1575 
1576   case MSG_STATE:
1577     switch (orig_state) {
1578     case STATE_SENTREJECT:
1579     case STATE_RECEIVEDREJECT:
1580       // Assume clean termination.
1581       Terminate();
1582       break;
1583 
1584     case STATE_SENTTERMINATE:
1585     case STATE_RECEIVEDTERMINATE:
1586       session_manager_->DestroySession(this);
1587       break;
1588 
1589     default:
1590       // Explicitly ignoring some states here.
1591       break;
1592     }
1593     break;
1594   }
1595 }
1596 
SendInitiateMessage(const SessionDescription * sdesc,SessionError * error)1597 bool Session::SendInitiateMessage(const SessionDescription* sdesc,
1598                                   SessionError* error) {
1599   SessionInitiate init;
1600   init.contents = sdesc->contents();
1601   init.transports = GetEmptyTransportInfos(init.contents);
1602   init.groups = sdesc->groups();
1603   return SendMessage(ACTION_SESSION_INITIATE, init, error);
1604 }
1605 
WriteSessionAction(SignalingProtocol protocol,const SessionInitiate & init,XmlElements * elems,WriteError * error)1606 bool Session::WriteSessionAction(
1607     SignalingProtocol protocol, const SessionInitiate& init,
1608     XmlElements* elems, WriteError* error) {
1609   return WriteSessionInitiate(protocol, init.contents, init.transports,
1610                               GetContentParsers(), GetTransportParsers(),
1611                               GetCandidateTranslators(), init.groups,
1612                               elems, error);
1613 }
1614 
SendAcceptMessage(const SessionDescription * sdesc,SessionError * error)1615 bool Session::SendAcceptMessage(const SessionDescription* sdesc,
1616                                 SessionError* error) {
1617   XmlElements elems;
1618   if (!WriteSessionAccept(current_protocol_,
1619                           sdesc->contents(),
1620                           GetEmptyTransportInfos(sdesc->contents()),
1621                           GetContentParsers(), GetTransportParsers(),
1622                           GetCandidateTranslators(), sdesc->groups(),
1623                           &elems, error)) {
1624     return false;
1625   }
1626   return SendMessage(ACTION_SESSION_ACCEPT, elems, error);
1627 }
1628 
SendRejectMessage(const std::string & reason,SessionError * error)1629 bool Session::SendRejectMessage(const std::string& reason,
1630                                 SessionError* error) {
1631   SessionTerminate term(reason);
1632   return SendMessage(ACTION_SESSION_REJECT, term, error);
1633 }
1634 
SendTerminateMessage(const std::string & reason,SessionError * error)1635 bool Session::SendTerminateMessage(const std::string& reason,
1636                                    SessionError* error) {
1637   SessionTerminate term(reason);
1638   return SendMessage(ACTION_SESSION_TERMINATE, term, error);
1639 }
1640 
WriteSessionAction(SignalingProtocol protocol,const SessionTerminate & term,XmlElements * elems,WriteError * error)1641 bool Session::WriteSessionAction(SignalingProtocol protocol,
1642                                  const SessionTerminate& term,
1643                                  XmlElements* elems, WriteError* error) {
1644   WriteSessionTerminate(protocol, term, elems);
1645   return true;
1646 }
1647 
SendTransportInfoMessage(const TransportInfo & tinfo,SessionError * error)1648 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo,
1649                                        SessionError* error) {
1650   return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error);
1651 }
1652 
SendTransportInfoMessage(const TransportProxy * transproxy,const Candidates & candidates,SessionError * error)1653 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy,
1654                                        const Candidates& candidates,
1655                                        SessionError* error) {
1656   return SendTransportInfoMessage(TransportInfo(transproxy->content_name(),
1657       TransportDescription(transproxy->type(), std::vector<std::string>(),
1658                            std::string(), std::string(), ICEMODE_FULL,
1659                            CONNECTIONROLE_NONE, NULL, candidates)), error);
1660 }
1661 
WriteSessionAction(SignalingProtocol protocol,const TransportInfo & tinfo,XmlElements * elems,WriteError * error)1662 bool Session::WriteSessionAction(SignalingProtocol protocol,
1663                                  const TransportInfo& tinfo,
1664                                  XmlElements* elems, WriteError* error) {
1665   TransportInfos tinfos;
1666   tinfos.push_back(tinfo);
1667   return WriteTransportInfos(protocol, tinfos,
1668                              GetTransportParsers(), GetCandidateTranslators(),
1669                              elems, error);
1670 }
1671 
ResendAllTransportInfoMessages(SessionError * error)1672 bool Session::ResendAllTransportInfoMessages(SessionError* error) {
1673   for (TransportMap::const_iterator iter = transport_proxies().begin();
1674        iter != transport_proxies().end(); ++iter) {
1675     TransportProxy* transproxy = iter->second;
1676     if (transproxy->sent_candidates().size() > 0) {
1677       if (!SendTransportInfoMessage(
1678               transproxy, transproxy->sent_candidates(), error)) {
1679         LOG(LS_ERROR) << "Could not resend transport info messages: "
1680                       << error->text;
1681         return false;
1682       }
1683       transproxy->ClearSentCandidates();
1684     }
1685   }
1686   return true;
1687 }
1688 
SendAllUnsentTransportInfoMessages(SessionError * error)1689 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) {
1690   for (TransportMap::const_iterator iter = transport_proxies().begin();
1691        iter != transport_proxies().end(); ++iter) {
1692     TransportProxy* transproxy = iter->second;
1693     if (transproxy->unsent_candidates().size() > 0) {
1694       if (!SendTransportInfoMessage(
1695               transproxy, transproxy->unsent_candidates(), error)) {
1696         LOG(LS_ERROR) << "Could not send unsent transport info messages: "
1697                       << error->text;
1698         return false;
1699       }
1700       transproxy->ClearUnsentCandidates();
1701     }
1702   }
1703   return true;
1704 }
1705 
SendMessage(ActionType type,const XmlElements & action_elems,SessionError * error)1706 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1707                           SessionError* error) {
1708     return SendMessage(type, action_elems, remote_name(), error);
1709 }
1710 
SendMessage(ActionType type,const XmlElements & action_elems,const std::string & remote_name,SessionError * error)1711 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1712                           const std::string& remote_name, SessionError* error) {
1713   rtc::scoped_ptr<buzz::XmlElement> stanza(
1714       new buzz::XmlElement(buzz::QN_IQ));
1715 
1716   SessionMessage msg(current_protocol_, type, id(), initiator_name());
1717   msg.to = remote_name;
1718   WriteSessionMessage(msg, action_elems, stanza.get());
1719 
1720   SignalOutgoingMessage(this, stanza.get());
1721   return true;
1722 }
1723 
1724 template <typename Action>
SendMessage(ActionType type,const Action & action,SessionError * error)1725 bool Session::SendMessage(ActionType type, const Action& action,
1726                           SessionError* error) {
1727   rtc::scoped_ptr<buzz::XmlElement> stanza(
1728       new buzz::XmlElement(buzz::QN_IQ));
1729   if (!WriteActionMessage(type, action, stanza.get(), error))
1730     return false;
1731 
1732   SignalOutgoingMessage(this, stanza.get());
1733   return true;
1734 }
1735 
1736 template <typename Action>
WriteActionMessage(ActionType type,const Action & action,buzz::XmlElement * stanza,WriteError * error)1737 bool Session::WriteActionMessage(ActionType type, const Action& action,
1738                                  buzz::XmlElement* stanza,
1739                                  WriteError* error) {
1740   if (current_protocol_ == PROTOCOL_HYBRID) {
1741     if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error))
1742       return false;
1743     if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error))
1744       return false;
1745   } else {
1746     if (!WriteActionMessage(current_protocol_, type, action, stanza, error))
1747       return false;
1748   }
1749   return true;
1750 }
1751 
1752 template <typename Action>
WriteActionMessage(SignalingProtocol protocol,ActionType type,const Action & action,buzz::XmlElement * stanza,WriteError * error)1753 bool Session::WriteActionMessage(SignalingProtocol protocol,
1754                                  ActionType type, const Action& action,
1755                                  buzz::XmlElement* stanza, WriteError* error) {
1756   XmlElements action_elems;
1757   if (!WriteSessionAction(protocol, action, &action_elems, error))
1758     return false;
1759 
1760   SessionMessage msg(protocol, type, id(), initiator_name());
1761   msg.to = remote_name();
1762 
1763   WriteSessionMessage(msg, action_elems, stanza);
1764   return true;
1765 }
1766 
SendAcknowledgementMessage(const buzz::XmlElement * stanza)1767 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) {
1768   rtc::scoped_ptr<buzz::XmlElement> ack(
1769       new buzz::XmlElement(buzz::QN_IQ));
1770   ack->SetAttr(buzz::QN_TO, remote_name());
1771   ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID));
1772   ack->SetAttr(buzz::QN_TYPE, "result");
1773 
1774   SignalOutgoingMessage(this, ack.get());
1775 }
1776 
1777 }  // namespace cricket
1778