• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "talk/p2p/base/session.h"
29 
30 #include "talk/base/bind.h"
31 #include "talk/base/common.h"
32 #include "talk/base/logging.h"
33 #include "talk/base/helpers.h"
34 #include "talk/base/scoped_ptr.h"
35 #include "talk/base/sslstreamadapter.h"
36 #include "talk/xmpp/constants.h"
37 #include "talk/xmpp/jid.h"
38 #include "talk/p2p/base/dtlstransport.h"
39 #include "talk/p2p/base/p2ptransport.h"
40 #include "talk/p2p/base/sessionclient.h"
41 #include "talk/p2p/base/transport.h"
42 #include "talk/p2p/base/transportchannelproxy.h"
43 #include "talk/p2p/base/transportinfo.h"
44 
45 #include "talk/p2p/base/constants.h"
46 
47 namespace cricket {
48 
49 using talk_base::Bind;
50 
BadMessage(const buzz::QName type,const std::string & text,MessageError * err)51 bool BadMessage(const buzz::QName type,
52                 const std::string& text,
53                 MessageError* err) {
54   err->SetType(type);
55   err->SetText(text);
56   return false;
57 }
58 
~TransportProxy()59 TransportProxy::~TransportProxy() {
60   for (ChannelMap::iterator iter = channels_.begin();
61        iter != channels_.end(); ++iter) {
62     iter->second->SignalDestroyed(iter->second);
63     delete iter->second;
64   }
65 }
66 
type() const67 std::string TransportProxy::type() const {
68   return transport_->get()->type();
69 }
70 
GetChannel(int component)71 TransportChannel* TransportProxy::GetChannel(int component) {
72   ASSERT(talk_base::Thread::Current() == worker_thread_);
73   return GetChannelProxy(component);
74 }
75 
CreateChannel(const std::string & name,int component)76 TransportChannel* TransportProxy::CreateChannel(
77     const std::string& name, int component) {
78   ASSERT(talk_base::Thread::Current() == worker_thread_);
79   ASSERT(GetChannel(component) == NULL);
80   ASSERT(!transport_->get()->HasChannel(component));
81 
82   // We always create a proxy in case we need to change out the transport later.
83   TransportChannelProxy* channel =
84       new TransportChannelProxy(content_name(), name, component);
85   channels_[component] = channel;
86 
87   // If we're already negotiated, create an impl and hook it up to the proxy
88   // channel. If we're connecting, create an impl but don't hook it up yet.
89   if (negotiated_) {
90     SetupChannelProxy_w(component, channel);
91   } else if (connecting_) {
92     GetOrCreateChannelProxyImpl_w(component);
93   }
94   return channel;
95 }
96 
HasChannel(int component)97 bool TransportProxy::HasChannel(int component) {
98   return transport_->get()->HasChannel(component);
99 }
100 
DestroyChannel(int component)101 void TransportProxy::DestroyChannel(int component) {
102   ASSERT(talk_base::Thread::Current() == worker_thread_);
103   TransportChannel* channel = GetChannel(component);
104   if (channel) {
105     // If the state of TransportProxy is not NEGOTIATED
106     // then TransportChannelProxy and its impl are not
107     // connected. Both must be connected before
108     // deletion.
109     if (!negotiated_) {
110       SetupChannelProxy_w(component, GetChannelProxy(component));
111     }
112 
113     channels_.erase(component);
114     channel->SignalDestroyed(channel);
115     delete channel;
116   }
117 }
118 
ConnectChannels()119 void TransportProxy::ConnectChannels() {
120   if (!connecting_) {
121     if (!negotiated_) {
122       for (ChannelMap::iterator iter = channels_.begin();
123            iter != channels_.end(); ++iter) {
124         GetOrCreateChannelProxyImpl(iter->first);
125       }
126     }
127     connecting_ = true;
128   }
129   // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
130   // don't have any channels yet, so we need to allow this method to be called
131   // multiple times. Once we fix Transport, we can move this call inside the
132   // if (!connecting_) block.
133   transport_->get()->ConnectChannels();
134 }
135 
CompleteNegotiation()136 void TransportProxy::CompleteNegotiation() {
137   if (!negotiated_) {
138     for (ChannelMap::iterator iter = channels_.begin();
139          iter != channels_.end(); ++iter) {
140       SetupChannelProxy(iter->first, iter->second);
141     }
142     negotiated_ = true;
143   }
144 }
145 
AddSentCandidates(const Candidates & candidates)146 void TransportProxy::AddSentCandidates(const Candidates& candidates) {
147   for (Candidates::const_iterator cand = candidates.begin();
148        cand != candidates.end(); ++cand) {
149     sent_candidates_.push_back(*cand);
150   }
151 }
152 
AddUnsentCandidates(const Candidates & candidates)153 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
154   for (Candidates::const_iterator cand = candidates.begin();
155        cand != candidates.end(); ++cand) {
156     unsent_candidates_.push_back(*cand);
157   }
158 }
159 
GetChannelNameFromComponent(int component,std::string * channel_name) const160 bool TransportProxy::GetChannelNameFromComponent(
161     int component, std::string* channel_name) const {
162   const TransportChannelProxy* channel = GetChannelProxy(component);
163   if (channel == NULL) {
164     return false;
165   }
166 
167   *channel_name = channel->name();
168   return true;
169 }
170 
GetComponentFromChannelName(const std::string & channel_name,int * component) const171 bool TransportProxy::GetComponentFromChannelName(
172     const std::string& channel_name, int* component) const {
173   const TransportChannelProxy* channel = GetChannelProxyByName(channel_name);
174   if (channel == NULL) {
175     return false;
176   }
177 
178   *component = channel->component();
179   return true;
180 }
181 
GetChannelProxy(int component) const182 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
183   ChannelMap::const_iterator iter = channels_.find(component);
184   return (iter != channels_.end()) ? iter->second : NULL;
185 }
186 
GetChannelProxyByName(const std::string & name) const187 TransportChannelProxy* TransportProxy::GetChannelProxyByName(
188     const std::string& name) const {
189   for (ChannelMap::const_iterator iter = channels_.begin();
190        iter != channels_.end();
191        ++iter) {
192     if (iter->second->name() == name) {
193       return iter->second;
194     }
195   }
196   return NULL;
197 }
198 
GetOrCreateChannelProxyImpl(int component)199 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl(
200     int component) {
201   return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
202       &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component));
203 }
204 
GetOrCreateChannelProxyImpl_w(int component)205 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w(
206     int component) {
207   ASSERT(talk_base::Thread::Current() == worker_thread_);
208   TransportChannelImpl* impl = transport_->get()->GetChannel(component);
209   if (impl == NULL) {
210     impl = transport_->get()->CreateChannel(component);
211   }
212   return impl;
213 }
214 
SetupChannelProxy(int component,TransportChannelProxy * transproxy)215 void TransportProxy::SetupChannelProxy(
216     int component, TransportChannelProxy* transproxy) {
217   worker_thread_->Invoke<void>(Bind(
218       &TransportProxy::SetupChannelProxy_w, this, component, transproxy));
219 }
220 
SetupChannelProxy_w(int component,TransportChannelProxy * transproxy)221 void TransportProxy::SetupChannelProxy_w(
222     int component, TransportChannelProxy* transproxy) {
223   ASSERT(talk_base::Thread::Current() == worker_thread_);
224   TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component);
225   ASSERT(impl != NULL);
226   transproxy->SetImplementation(impl);
227 }
228 
ReplaceChannelProxyImpl(TransportChannelProxy * proxy,TransportChannelImpl * impl)229 void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy,
230                                              TransportChannelImpl* impl) {
231   worker_thread_->Invoke<void>(Bind(
232       &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl));
233 }
234 
ReplaceChannelProxyImpl_w(TransportChannelProxy * proxy,TransportChannelImpl * impl)235 void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy,
236                                                TransportChannelImpl* impl) {
237   ASSERT(talk_base::Thread::Current() == worker_thread_);
238   ASSERT(proxy != NULL);
239   proxy->SetImplementation(impl);
240 }
241 
242 // This function muxes |this| onto |target| by repointing |this| at
243 // |target|'s transport and setting our TransportChannelProxies
244 // to point to |target|'s underlying implementations.
SetupMux(TransportProxy * target)245 bool TransportProxy::SetupMux(TransportProxy* target) {
246   // Bail out if there's nothing to do.
247   if (transport_ == target->transport_) {
248     return true;
249   }
250 
251   // Run through all channels and remove any non-rtp transport channels before
252   // setting target transport channels.
253   for (ChannelMap::const_iterator iter = channels_.begin();
254        iter != channels_.end(); ++iter) {
255     if (!target->transport_->get()->HasChannel(iter->first)) {
256       // Remove if channel doesn't exist in |transport_|.
257       ReplaceChannelProxyImpl(iter->second, NULL);
258     } else {
259       // Replace the impl for all the TransportProxyChannels with the channels
260       // from |target|'s transport. Fail if there's not an exact match.
261       ReplaceChannelProxyImpl(
262           iter->second, target->transport_->get()->CreateChannel(iter->first));
263     }
264   }
265 
266   // Now replace our transport. Must happen afterwards because
267   // it deletes all impls as a side effect.
268   transport_ = target->transport_;
269   transport_->get()->SignalCandidatesReady.connect(
270       this, &TransportProxy::OnTransportCandidatesReady);
271   set_candidates_allocated(target->candidates_allocated());
272   return true;
273 }
274 
SetIceRole(IceRole role)275 void TransportProxy::SetIceRole(IceRole role) {
276   transport_->get()->SetIceRole(role);
277 }
278 
SetLocalTransportDescription(const TransportDescription & description,ContentAction action,std::string * error_desc)279 bool TransportProxy::SetLocalTransportDescription(
280     const TransportDescription& description,
281     ContentAction action,
282     std::string* error_desc) {
283   // If this is an answer, finalize the negotiation.
284   if (action == CA_ANSWER) {
285     CompleteNegotiation();
286   }
287   bool result = transport_->get()->SetLocalTransportDescription(description,
288                                                                 action,
289                                                                 error_desc);
290   if (result)
291     local_description_set_ = true;
292   return result;
293 }
294 
SetRemoteTransportDescription(const TransportDescription & description,ContentAction action,std::string * error_desc)295 bool TransportProxy::SetRemoteTransportDescription(
296     const TransportDescription& description,
297     ContentAction action,
298     std::string* error_desc) {
299   // If this is an answer, finalize the negotiation.
300   if (action == CA_ANSWER) {
301     CompleteNegotiation();
302   }
303   bool result = transport_->get()->SetRemoteTransportDescription(description,
304                                                                  action,
305                                                                  error_desc);
306   if (result)
307     remote_description_set_ = true;
308   return result;
309 }
310 
OnSignalingReady()311 void TransportProxy::OnSignalingReady() {
312   // If we're starting a new allocation sequence, reset our state.
313   set_candidates_allocated(false);
314   transport_->get()->OnSignalingReady();
315 }
316 
OnRemoteCandidates(const Candidates & candidates,std::string * error)317 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
318                                         std::string* error) {
319   // Ensure the transport is negotiated before handling candidates.
320   // TODO(juberti): Remove this once everybody calls SetLocalTD.
321   CompleteNegotiation();
322 
323   // Verify each candidate before passing down to transport layer.
324   for (Candidates::const_iterator cand = candidates.begin();
325        cand != candidates.end(); ++cand) {
326     if (!transport_->get()->VerifyCandidate(*cand, error))
327       return false;
328     if (!HasChannel(cand->component())) {
329       *error = "Candidate has unknown component: " + cand->ToString() +
330                " for content: " + content_name_;
331       return false;
332     }
333   }
334   transport_->get()->OnRemoteCandidates(candidates);
335   return true;
336 }
337 
SetIdentity(talk_base::SSLIdentity * identity)338 void TransportProxy::SetIdentity(
339     talk_base::SSLIdentity* identity) {
340   transport_->get()->SetIdentity(identity);
341 }
342 
StateToString(State state)343 std::string BaseSession::StateToString(State state) {
344   switch (state) {
345     case Session::STATE_INIT:
346       return "STATE_INIT";
347     case Session::STATE_SENTINITIATE:
348       return "STATE_SENTINITIATE";
349     case Session::STATE_RECEIVEDINITIATE:
350       return "STATE_RECEIVEDINITIATE";
351     case Session::STATE_SENTPRACCEPT:
352       return "STATE_SENTPRACCEPT";
353     case Session::STATE_SENTACCEPT:
354       return "STATE_SENTACCEPT";
355     case Session::STATE_RECEIVEDPRACCEPT:
356       return "STATE_RECEIVEDPRACCEPT";
357     case Session::STATE_RECEIVEDACCEPT:
358       return "STATE_RECEIVEDACCEPT";
359     case Session::STATE_SENTMODIFY:
360       return "STATE_SENTMODIFY";
361     case Session::STATE_RECEIVEDMODIFY:
362       return "STATE_RECEIVEDMODIFY";
363     case Session::STATE_SENTREJECT:
364       return "STATE_SENTREJECT";
365     case Session::STATE_RECEIVEDREJECT:
366       return "STATE_RECEIVEDREJECT";
367     case Session::STATE_SENTREDIRECT:
368       return "STATE_SENTREDIRECT";
369     case Session::STATE_SENTTERMINATE:
370       return "STATE_SENTTERMINATE";
371     case Session::STATE_RECEIVEDTERMINATE:
372       return "STATE_RECEIVEDTERMINATE";
373     case Session::STATE_INPROGRESS:
374       return "STATE_INPROGRESS";
375     case Session::STATE_DEINIT:
376       return "STATE_DEINIT";
377     default:
378       break;
379   }
380   return "STATE_" + talk_base::ToString(state);
381 }
382 
BaseSession(talk_base::Thread * signaling_thread,talk_base::Thread * worker_thread,PortAllocator * port_allocator,const std::string & sid,const std::string & content_type,bool initiator)383 BaseSession::BaseSession(talk_base::Thread* signaling_thread,
384                          talk_base::Thread* worker_thread,
385                          PortAllocator* port_allocator,
386                          const std::string& sid,
387                          const std::string& content_type,
388                          bool initiator)
389     : state_(STATE_INIT),
390       error_(ERROR_NONE),
391       signaling_thread_(signaling_thread),
392       worker_thread_(worker_thread),
393       port_allocator_(port_allocator),
394       sid_(sid),
395       content_type_(content_type),
396       transport_type_(NS_GINGLE_P2P),
397       initiator_(initiator),
398       identity_(NULL),
399       local_description_(NULL),
400       remote_description_(NULL),
401       ice_tiebreaker_(talk_base::CreateRandomId64()),
402       role_switch_(false) {
403   ASSERT(signaling_thread->IsCurrent());
404 }
405 
~BaseSession()406 BaseSession::~BaseSession() {
407   ASSERT(signaling_thread()->IsCurrent());
408 
409   ASSERT(state_ != STATE_DEINIT);
410   LogState(state_, STATE_DEINIT);
411   state_ = STATE_DEINIT;
412   SignalState(this, state_);
413 
414   for (TransportMap::iterator iter = transports_.begin();
415        iter != transports_.end(); ++iter) {
416     delete iter->second;
417   }
418 
419   delete remote_description_;
420   delete local_description_;
421 }
422 
SetIdentity(talk_base::SSLIdentity * identity)423 bool BaseSession::SetIdentity(talk_base::SSLIdentity* identity) {
424   if (identity_)
425     return false;
426   identity_ = identity;
427   for (TransportMap::iterator iter = transports_.begin();
428        iter != transports_.end(); ++iter) {
429     iter->second->SetIdentity(identity_);
430   }
431   return true;
432 }
433 
PushdownTransportDescription(ContentSource source,ContentAction action,std::string * error_desc)434 bool BaseSession::PushdownTransportDescription(ContentSource source,
435                                                ContentAction action,
436                                                std::string* error_desc) {
437   if (source == CS_LOCAL) {
438     return PushdownLocalTransportDescription(local_description_,
439                                              action,
440                                              error_desc);
441   }
442   return PushdownRemoteTransportDescription(remote_description_,
443                                             action,
444                                             error_desc);
445 }
446 
PushdownLocalTransportDescription(const SessionDescription * sdesc,ContentAction action,std::string * error_desc)447 bool BaseSession::PushdownLocalTransportDescription(
448     const SessionDescription* sdesc,
449     ContentAction action,
450     std::string* error_desc) {
451   // Update the Transports with the right information, and trigger them to
452   // start connecting.
453   for (TransportMap::iterator iter = transports_.begin();
454        iter != transports_.end(); ++iter) {
455     // If no transport info was in this session description, ret == false
456     // and we just skip this one.
457     TransportDescription tdesc;
458     bool ret = GetTransportDescription(
459         sdesc, iter->second->content_name(), &tdesc);
460     if (ret) {
461       if (!iter->second->SetLocalTransportDescription(tdesc, action,
462                                                       error_desc)) {
463         return false;
464       }
465 
466       iter->second->ConnectChannels();
467     }
468   }
469 
470   return true;
471 }
472 
PushdownRemoteTransportDescription(const SessionDescription * sdesc,ContentAction action,std::string * error_desc)473 bool BaseSession::PushdownRemoteTransportDescription(
474     const SessionDescription* sdesc,
475     ContentAction action,
476     std::string* error_desc) {
477   // Update the Transports with the right information.
478   for (TransportMap::iterator iter = transports_.begin();
479        iter != transports_.end(); ++iter) {
480     TransportDescription tdesc;
481 
482     // If no transport info was in this session description, ret == false
483     // and we just skip this one.
484     bool ret = GetTransportDescription(
485         sdesc, iter->second->content_name(), &tdesc);
486     if (ret) {
487       if (!iter->second->SetRemoteTransportDescription(tdesc, action,
488                                                        error_desc)) {
489         return false;
490       }
491     }
492   }
493 
494   return true;
495 }
496 
CreateChannel(const std::string & content_name,const std::string & channel_name,int component)497 TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
498                                              const std::string& channel_name,
499                                              int component) {
500   // We create the proxy "on demand" here because we need to support
501   // creating channels at any time, even before we send or receive
502   // initiate messages, which is before we create the transports.
503   TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
504   return transproxy->CreateChannel(channel_name, component);
505 }
506 
GetChannel(const std::string & content_name,int component)507 TransportChannel* BaseSession::GetChannel(const std::string& content_name,
508                                           int component) {
509   TransportProxy* transproxy = GetTransportProxy(content_name);
510   if (transproxy == NULL)
511     return NULL;
512   else
513     return transproxy->GetChannel(component);
514 }
515 
DestroyChannel(const std::string & content_name,int component)516 void BaseSession::DestroyChannel(const std::string& content_name,
517                                  int component) {
518   TransportProxy* transproxy = GetTransportProxy(content_name);
519   ASSERT(transproxy != NULL);
520   transproxy->DestroyChannel(component);
521 }
522 
GetOrCreateTransportProxy(const std::string & content_name)523 TransportProxy* BaseSession::GetOrCreateTransportProxy(
524     const std::string& content_name) {
525   TransportProxy* transproxy = GetTransportProxy(content_name);
526   if (transproxy)
527     return transproxy;
528 
529   Transport* transport = CreateTransport(content_name);
530   transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED);
531   transport->SetIceTiebreaker(ice_tiebreaker_);
532   // TODO: Connect all the Transport signals to TransportProxy
533   // then to the BaseSession.
534   transport->SignalConnecting.connect(
535       this, &BaseSession::OnTransportConnecting);
536   transport->SignalWritableState.connect(
537       this, &BaseSession::OnTransportWritable);
538   transport->SignalRequestSignaling.connect(
539       this, &BaseSession::OnTransportRequestSignaling);
540   transport->SignalTransportError.connect(
541       this, &BaseSession::OnTransportSendError);
542   transport->SignalRouteChange.connect(
543       this, &BaseSession::OnTransportRouteChange);
544   transport->SignalCandidatesAllocationDone.connect(
545       this, &BaseSession::OnTransportCandidatesAllocationDone);
546   transport->SignalRoleConflict.connect(
547       this, &BaseSession::OnRoleConflict);
548   transport->SignalCompleted.connect(
549       this, &BaseSession::OnTransportCompleted);
550   transport->SignalFailed.connect(
551       this, &BaseSession::OnTransportFailed);
552 
553   transproxy = new TransportProxy(worker_thread_, sid_, content_name,
554                                   new TransportWrapper(transport));
555   transproxy->SignalCandidatesReady.connect(
556       this, &BaseSession::OnTransportProxyCandidatesReady);
557   if (identity_)
558     transproxy->SetIdentity(identity_);
559   transports_[content_name] = transproxy;
560 
561   return transproxy;
562 }
563 
GetTransport(const std::string & content_name)564 Transport* BaseSession::GetTransport(const std::string& content_name) {
565   TransportProxy* transproxy = GetTransportProxy(content_name);
566   if (transproxy == NULL)
567     return NULL;
568   return transproxy->impl();
569 }
570 
GetTransportProxy(const std::string & content_name)571 TransportProxy* BaseSession::GetTransportProxy(
572     const std::string& content_name) {
573   TransportMap::iterator iter = transports_.find(content_name);
574   return (iter != transports_.end()) ? iter->second : NULL;
575 }
576 
GetTransportProxy(const Transport * transport)577 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) {
578   for (TransportMap::iterator iter = transports_.begin();
579        iter != transports_.end(); ++iter) {
580     TransportProxy* transproxy = iter->second;
581     if (transproxy->impl() == transport) {
582       return transproxy;
583     }
584   }
585   return NULL;
586 }
587 
GetFirstTransportProxy()588 TransportProxy* BaseSession::GetFirstTransportProxy() {
589   if (transports_.empty())
590     return NULL;
591   return transports_.begin()->second;
592 }
593 
DestroyTransportProxy(const std::string & content_name)594 void BaseSession::DestroyTransportProxy(
595     const std::string& content_name) {
596   TransportMap::iterator iter = transports_.find(content_name);
597   if (iter != transports_.end()) {
598     delete iter->second;
599     transports_.erase(content_name);
600   }
601 }
602 
CreateTransport(const std::string & content_name)603 cricket::Transport* BaseSession::CreateTransport(
604     const std::string& content_name) {
605   ASSERT(transport_type_ == NS_GINGLE_P2P);
606   return new cricket::DtlsTransport<P2PTransport>(
607       signaling_thread(), worker_thread(), content_name,
608       port_allocator(), identity_);
609 }
610 
GetStats(SessionStats * stats)611 bool BaseSession::GetStats(SessionStats* stats) {
612   for (TransportMap::iterator iter = transports_.begin();
613        iter != transports_.end(); ++iter) {
614     std::string proxy_id = iter->second->content_name();
615     // We are ignoring not-yet-instantiated transports.
616     if (iter->second->impl()) {
617       std::string transport_id = iter->second->impl()->content_name();
618       stats->proxy_to_transport[proxy_id] = transport_id;
619       if (stats->transport_stats.find(transport_id)
620           == stats->transport_stats.end()) {
621         TransportStats subinfos;
622         if (!iter->second->impl()->GetStats(&subinfos)) {
623           return false;
624         }
625         stats->transport_stats[transport_id] = subinfos;
626       }
627     }
628   }
629   return true;
630 }
631 
SetState(State state)632 void BaseSession::SetState(State state) {
633   ASSERT(signaling_thread_->IsCurrent());
634   if (state != state_) {
635     LogState(state_, state);
636     state_ = state;
637     SignalState(this, state_);
638     signaling_thread_->Post(this, MSG_STATE);
639   }
640   SignalNewDescription();
641 }
642 
SetError(Error error,const std::string & error_desc)643 void BaseSession::SetError(Error error, const std::string& error_desc) {
644   ASSERT(signaling_thread_->IsCurrent());
645   if (error != error_) {
646     error_ = error;
647     error_desc_ = error_desc;
648     SignalError(this, error);
649   }
650 }
651 
OnSignalingReady()652 void BaseSession::OnSignalingReady() {
653   ASSERT(signaling_thread()->IsCurrent());
654   for (TransportMap::iterator iter = transports_.begin();
655        iter != transports_.end(); ++iter) {
656     iter->second->OnSignalingReady();
657   }
658 }
659 
660 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to
661 // start, remove this method once everyone calls PushdownLocalTD.
SpeculativelyConnectAllTransportChannels()662 void BaseSession::SpeculativelyConnectAllTransportChannels() {
663   // Put all transports into the connecting state.
664   for (TransportMap::iterator iter = transports_.begin();
665        iter != transports_.end(); ++iter) {
666     iter->second->ConnectChannels();
667   }
668 }
669 
OnRemoteCandidates(const std::string & content_name,const Candidates & candidates,std::string * error)670 bool BaseSession::OnRemoteCandidates(const std::string& content_name,
671                                      const Candidates& candidates,
672                                      std::string* error) {
673   // Give candidates to the appropriate transport, and tell that transport
674   // to start connecting, if it's not already doing so.
675   TransportProxy* transproxy = GetTransportProxy(content_name);
676   if (!transproxy) {
677     *error = "Unknown content name " + content_name;
678     return false;
679   }
680   if (!transproxy->OnRemoteCandidates(candidates, error)) {
681     return false;
682   }
683   // TODO(juberti): Remove this call once we can be sure that we always have
684   // a local transport description (which will trigger the connection).
685   transproxy->ConnectChannels();
686   return true;
687 }
688 
MaybeEnableMuxingSupport()689 bool BaseSession::MaybeEnableMuxingSupport() {
690   // We need both a local and remote description to decide if we should mux.
691   if ((state_ == STATE_SENTINITIATE ||
692       state_ == STATE_RECEIVEDINITIATE) &&
693       ((local_description_ == NULL) ||
694       (remote_description_ == NULL))) {
695     return false;
696   }
697 
698   // In order to perform the multiplexing, we need all proxies to be in the
699   // negotiated state, i.e. to have implementations underneath.
700   // Ensure that this is the case, regardless of whether we are going to mux.
701   for (TransportMap::iterator iter = transports_.begin();
702        iter != transports_.end(); ++iter) {
703     ASSERT(iter->second->negotiated());
704     if (!iter->second->negotiated())
705       return false;
706   }
707 
708   // If both sides agree to BUNDLE, mux all the specified contents onto the
709   // transport belonging to the first content name in the BUNDLE group.
710   // If the contents are already muxed, this will be a no-op.
711   // TODO(juberti): Should this check that local and remote have configured
712   // BUNDLE the same way?
713   bool candidates_allocated = IsCandidateAllocationDone();
714   const ContentGroup* local_bundle_group =
715       local_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
716   const ContentGroup* remote_bundle_group =
717       remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
718   if (local_bundle_group && remote_bundle_group &&
719       local_bundle_group->FirstContentName()) {
720     const std::string* content_name = local_bundle_group->FirstContentName();
721     const ContentInfo* content =
722         local_description_->GetContentByName(*content_name);
723     ASSERT(content != NULL);
724     if (!SetSelectedProxy(content->name, local_bundle_group)) {
725       LOG(LS_WARNING) << "Failed to set up BUNDLE";
726       return false;
727     }
728 
729     // If we weren't done gathering before, we might be done now, as a result
730     // of enabling mux.
731     LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: "
732                  << *content_name;
733     if (!candidates_allocated) {
734       MaybeCandidateAllocationDone();
735     }
736   } else {
737     LOG(LS_INFO) << "No BUNDLE information, not bundling.";
738   }
739   return true;
740 }
741 
SetSelectedProxy(const std::string & content_name,const ContentGroup * muxed_group)742 bool BaseSession::SetSelectedProxy(const std::string& content_name,
743                                    const ContentGroup* muxed_group) {
744   TransportProxy* selected_proxy = GetTransportProxy(content_name);
745   if (!selected_proxy) {
746     return false;
747   }
748 
749   ASSERT(selected_proxy->negotiated());
750   for (TransportMap::iterator iter = transports_.begin();
751        iter != transports_.end(); ++iter) {
752     // If content is part of the mux group, then repoint its proxy at the
753     // transport object that we have chosen to mux onto. If the proxy
754     // is already pointing at the right object, it will be a no-op.
755     if (muxed_group->HasContentName(iter->first) &&
756         !iter->second->SetupMux(selected_proxy)) {
757       return false;
758     }
759   }
760   return true;
761 }
762 
OnTransportCandidatesAllocationDone(Transport * transport)763 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
764   // TODO(juberti): This is a clunky way of processing the done signal. Instead,
765   // TransportProxy should receive the done signal directly, set its allocated
766   // flag internally, and then reissue the done signal to Session.
767   // Overall we should make TransportProxy receive *all* the signals from
768   // Transport, since this removes the need to manually iterate over all
769   // the transports, as is needed to make sure signals are handled properly
770   // when BUNDLEing.
771   // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways
772   // that make it prohibitively difficult to run dbg builds. Disabled for now.
773   //ASSERT(!IsCandidateAllocationDone());
774   for (TransportMap::iterator iter = transports_.begin();
775        iter != transports_.end(); ++iter) {
776     if (iter->second->impl() == transport) {
777       iter->second->set_candidates_allocated(true);
778     }
779   }
780   MaybeCandidateAllocationDone();
781 }
782 
IsCandidateAllocationDone() const783 bool BaseSession::IsCandidateAllocationDone() const {
784   for (TransportMap::const_iterator iter = transports_.begin();
785        iter != transports_.end(); ++iter) {
786     if (!iter->second->candidates_allocated())
787       return false;
788   }
789   return true;
790 }
791 
MaybeCandidateAllocationDone()792 void BaseSession::MaybeCandidateAllocationDone() {
793   if (IsCandidateAllocationDone()) {
794     LOG(LS_INFO) << "Candidate gathering is complete.";
795     OnCandidatesAllocationDone();
796   }
797 }
798 
OnRoleConflict()799 void BaseSession::OnRoleConflict() {
800   if (role_switch_) {
801     LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
802     return;
803   }
804 
805   role_switch_ = true;
806   for (TransportMap::iterator iter = transports_.begin();
807        iter != transports_.end(); ++iter) {
808     // Role will be reverse of initial role setting.
809     IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING;
810     iter->second->SetIceRole(role);
811   }
812 }
813 
LogState(State old_state,State new_state)814 void BaseSession::LogState(State old_state, State new_state) {
815   LOG(LS_INFO) << "Session:" << id()
816                << " Old state:" << StateToString(old_state)
817                << " New state:" << StateToString(new_state)
818                << " Type:" << content_type()
819                << " Transport:" << transport_type();
820 }
821 
GetTransportDescription(const SessionDescription * description,const std::string & content_name,TransportDescription * tdesc)822 bool BaseSession::GetTransportDescription(const SessionDescription* description,
823                                           const std::string& content_name,
824                                           TransportDescription* tdesc) {
825   if (!description || !tdesc) {
826     return false;
827   }
828   const TransportInfo* transport_info =
829       description->GetTransportInfoByName(content_name);
830   if (!transport_info) {
831     return false;
832   }
833   *tdesc = transport_info->description;
834   return true;
835 }
836 
SignalNewDescription()837 void BaseSession::SignalNewDescription() {
838   ContentAction action;
839   ContentSource source;
840   if (!GetContentAction(&action, &source)) {
841     return;
842   }
843   if (source == CS_LOCAL) {
844     SignalNewLocalDescription(this, action);
845   } else {
846     SignalNewRemoteDescription(this, action);
847   }
848 }
849 
GetContentAction(ContentAction * action,ContentSource * source)850 bool BaseSession::GetContentAction(ContentAction* action,
851                                    ContentSource* source) {
852   switch (state_) {
853     // new local description
854     case STATE_SENTINITIATE:
855       *action = CA_OFFER;
856       *source = CS_LOCAL;
857       break;
858     case STATE_SENTPRACCEPT:
859       *action = CA_PRANSWER;
860       *source = CS_LOCAL;
861       break;
862     case STATE_SENTACCEPT:
863       *action = CA_ANSWER;
864       *source = CS_LOCAL;
865       break;
866     // new remote description
867     case STATE_RECEIVEDINITIATE:
868       *action = CA_OFFER;
869       *source = CS_REMOTE;
870       break;
871     case STATE_RECEIVEDPRACCEPT:
872       *action = CA_PRANSWER;
873       *source = CS_REMOTE;
874       break;
875     case STATE_RECEIVEDACCEPT:
876       *action = CA_ANSWER;
877       *source = CS_REMOTE;
878       break;
879     default:
880       return false;
881   }
882   return true;
883 }
884 
OnMessage(talk_base::Message * pmsg)885 void BaseSession::OnMessage(talk_base::Message *pmsg) {
886   switch (pmsg->message_id) {
887   case MSG_TIMEOUT:
888     // Session timeout has occured.
889     SetError(ERROR_TIME, "Session timeout has occured.");
890     break;
891 
892   case MSG_STATE:
893     switch (state_) {
894     case STATE_SENTACCEPT:
895     case STATE_RECEIVEDACCEPT:
896       SetState(STATE_INPROGRESS);
897       break;
898 
899     default:
900       // Explicitly ignoring some states here.
901       break;
902     }
903     break;
904   }
905 }
906 
Session(SessionManager * session_manager,const std::string & local_name,const std::string & initiator_name,const std::string & sid,const std::string & content_type,SessionClient * client)907 Session::Session(SessionManager* session_manager,
908                  const std::string& local_name,
909                  const std::string& initiator_name,
910                  const std::string& sid,
911                  const std::string& content_type,
912                  SessionClient* client)
913     : BaseSession(session_manager->signaling_thread(),
914                   session_manager->worker_thread(),
915                   session_manager->port_allocator(),
916                   sid, content_type, initiator_name == local_name) {
917   ASSERT(client != NULL);
918   session_manager_ = session_manager;
919   local_name_ = local_name;
920   initiator_name_ = initiator_name;
921   transport_parser_ = new P2PTransportParser();
922   client_ = client;
923   initiate_acked_ = false;
924   current_protocol_ = PROTOCOL_HYBRID;
925 }
926 
~Session()927 Session::~Session() {
928   delete transport_parser_;
929 }
930 
Initiate(const std::string & to,const SessionDescription * sdesc)931 bool Session::Initiate(const std::string &to,
932                        const SessionDescription* sdesc) {
933   ASSERT(signaling_thread()->IsCurrent());
934   SessionError error;
935 
936   // Only from STATE_INIT
937   if (state() != STATE_INIT)
938     return false;
939 
940   // Setup for signaling.
941   set_remote_name(to);
942   set_local_description(sdesc);
943   if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()),
944                               &error)) {
945     LOG(LS_ERROR) << "Could not create transports: " << error.text;
946     return false;
947   }
948 
949   if (!SendInitiateMessage(sdesc, &error)) {
950     LOG(LS_ERROR) << "Could not send initiate message: " << error.text;
951     return false;
952   }
953 
954   // We need to connect transport proxy and impl here so that we can process
955   // the TransportDescriptions.
956   SpeculativelyConnectAllTransportChannels();
957 
958   PushdownTransportDescription(CS_LOCAL, CA_OFFER, NULL);
959   SetState(Session::STATE_SENTINITIATE);
960   return true;
961 }
962 
Accept(const SessionDescription * sdesc)963 bool Session::Accept(const SessionDescription* sdesc) {
964   ASSERT(signaling_thread()->IsCurrent());
965 
966   // Only if just received initiate
967   if (state() != STATE_RECEIVEDINITIATE)
968     return false;
969 
970   // Setup for signaling.
971   set_local_description(sdesc);
972 
973   SessionError error;
974   if (!SendAcceptMessage(sdesc, &error)) {
975     LOG(LS_ERROR) << "Could not send accept message: " << error.text;
976     return false;
977   }
978   // TODO(juberti): Add BUNDLE support to transport-info messages.
979   PushdownTransportDescription(CS_LOCAL, CA_ANSWER, NULL);
980   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
981   SetState(Session::STATE_SENTACCEPT);
982   return true;
983 }
984 
Reject(const std::string & reason)985 bool Session::Reject(const std::string& reason) {
986   ASSERT(signaling_thread()->IsCurrent());
987 
988   // Reject is sent in response to an initiate or modify, to reject the
989   // request
990   if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY)
991     return false;
992 
993   SessionError error;
994   if (!SendRejectMessage(reason, &error)) {
995     LOG(LS_ERROR) << "Could not send reject message: " << error.text;
996     return false;
997   }
998 
999   SetState(STATE_SENTREJECT);
1000   return true;
1001 }
1002 
TerminateWithReason(const std::string & reason)1003 bool Session::TerminateWithReason(const std::string& reason) {
1004   ASSERT(signaling_thread()->IsCurrent());
1005 
1006   // Either side can terminate, at any time.
1007   switch (state()) {
1008     case STATE_SENTTERMINATE:
1009     case STATE_RECEIVEDTERMINATE:
1010       return false;
1011 
1012     case STATE_SENTREJECT:
1013     case STATE_RECEIVEDREJECT:
1014       // We don't need to send terminate if we sent or received a reject...
1015       // it's implicit.
1016       break;
1017 
1018     default:
1019       SessionError error;
1020       if (!SendTerminateMessage(reason, &error)) {
1021         LOG(LS_ERROR) << "Could not send terminate message: " << error.text;
1022         return false;
1023       }
1024       break;
1025   }
1026 
1027   SetState(STATE_SENTTERMINATE);
1028   return true;
1029 }
1030 
SendInfoMessage(const XmlElements & elems,const std::string & remote_name)1031 bool Session::SendInfoMessage(const XmlElements& elems,
1032                               const std::string& remote_name) {
1033   ASSERT(signaling_thread()->IsCurrent());
1034   SessionError error;
1035   if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) {
1036     LOG(LS_ERROR) << "Could not send info message " << error.text;
1037     return false;
1038   }
1039   return true;
1040 }
1041 
SendDescriptionInfoMessage(const ContentInfos & contents)1042 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) {
1043   XmlElements elems;
1044   WriteError write_error;
1045   if (!WriteDescriptionInfo(current_protocol_,
1046                             contents,
1047                             GetContentParsers(),
1048                             &elems, &write_error)) {
1049     LOG(LS_ERROR) << "Could not write description info message: "
1050                   << write_error.text;
1051     return false;
1052   }
1053   SessionError error;
1054   if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) {
1055     LOG(LS_ERROR) << "Could not send description info message: "
1056                   << error.text;
1057     return false;
1058   }
1059   return true;
1060 }
1061 
GetEmptyTransportInfos(const ContentInfos & contents) const1062 TransportInfos Session::GetEmptyTransportInfos(
1063     const ContentInfos& contents) const {
1064   TransportInfos tinfos;
1065   for (ContentInfos::const_iterator content = contents.begin();
1066        content != contents.end(); ++content) {
1067     tinfos.push_back(TransportInfo(content->name,
1068                                    TransportDescription(transport_type(),
1069                                                         std::string(),
1070                                                         std::string())));
1071   }
1072   return tinfos;
1073 }
1074 
OnRemoteCandidates(const TransportInfos & tinfos,ParseError * error)1075 bool Session::OnRemoteCandidates(
1076     const TransportInfos& tinfos, ParseError* error) {
1077   for (TransportInfos::const_iterator tinfo = tinfos.begin();
1078        tinfo != tinfos.end(); ++tinfo) {
1079     std::string str_error;
1080     if (!BaseSession::OnRemoteCandidates(
1081         tinfo->content_name, tinfo->description.candidates, &str_error)) {
1082       return BadParse(str_error, error);
1083     }
1084   }
1085   return true;
1086 }
1087 
CreateTransportProxies(const TransportInfos & tinfos,SessionError * error)1088 bool Session::CreateTransportProxies(const TransportInfos& tinfos,
1089                                      SessionError* error) {
1090   for (TransportInfos::const_iterator tinfo = tinfos.begin();
1091        tinfo != tinfos.end(); ++tinfo) {
1092     if (tinfo->description.transport_type != transport_type()) {
1093       error->SetText("No supported transport in offer.");
1094       return false;
1095     }
1096 
1097     GetOrCreateTransportProxy(tinfo->content_name);
1098   }
1099   return true;
1100 }
1101 
GetTransportParsers()1102 TransportParserMap Session::GetTransportParsers() {
1103   TransportParserMap parsers;
1104   parsers[transport_type()] = transport_parser_;
1105   return parsers;
1106 }
1107 
GetCandidateTranslators()1108 CandidateTranslatorMap Session::GetCandidateTranslators() {
1109   CandidateTranslatorMap translators;
1110   // NOTE: This technique makes it impossible to parse G-ICE
1111   // candidates in session-initiate messages because the channels
1112   // aren't yet created at that point.  Since we don't use candidates
1113   // in session-initiate messages, we should be OK.  Once we switch to
1114   // ICE, this translation shouldn't be necessary.
1115   for (TransportMap::const_iterator iter = transport_proxies().begin();
1116        iter != transport_proxies().end(); ++iter) {
1117     translators[iter->first] = iter->second;
1118   }
1119   return translators;
1120 }
1121 
GetContentParsers()1122 ContentParserMap Session::GetContentParsers() {
1123   ContentParserMap parsers;
1124   parsers[content_type()] = client_;
1125   // We need to be able parse both RTP-based and SCTP-based Jingle
1126   // with the same client.
1127   if (content_type() == NS_JINGLE_RTP) {
1128     parsers[NS_JINGLE_DRAFT_SCTP] = client_;
1129   }
1130   return parsers;
1131 }
1132 
OnTransportRequestSignaling(Transport * transport)1133 void Session::OnTransportRequestSignaling(Transport* transport) {
1134   ASSERT(signaling_thread()->IsCurrent());
1135   TransportProxy* transproxy = GetTransportProxy(transport);
1136   ASSERT(transproxy != NULL);
1137   if (transproxy) {
1138     // Reset candidate allocation status for the transport proxy.
1139     transproxy->set_candidates_allocated(false);
1140   }
1141   SignalRequestSignaling(this);
1142 }
1143 
OnTransportConnecting(Transport * transport)1144 void Session::OnTransportConnecting(Transport* transport) {
1145   // This is an indication that we should begin watching the writability
1146   // state of the transport.
1147   OnTransportWritable(transport);
1148 }
1149 
OnTransportWritable(Transport * transport)1150 void Session::OnTransportWritable(Transport* transport) {
1151   ASSERT(signaling_thread()->IsCurrent());
1152 
1153   // If the transport is not writable, start a timer to make sure that it
1154   // becomes writable within a reasonable amount of time.  If it does not, we
1155   // terminate since we can't actually send data.  If the transport is writable,
1156   // cancel the timer.  Note that writability transitions may occur repeatedly
1157   // during the lifetime of the session.
1158   signaling_thread()->Clear(this, MSG_TIMEOUT);
1159   if (transport->HasChannels() && !transport->writable()) {
1160     signaling_thread()->PostDelayed(
1161         session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT);
1162   }
1163 }
1164 
OnTransportProxyCandidatesReady(TransportProxy * transproxy,const Candidates & candidates)1165 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy,
1166                                               const Candidates& candidates) {
1167   ASSERT(signaling_thread()->IsCurrent());
1168   if (transproxy != NULL) {
1169     if (initiator() && !initiate_acked_) {
1170       // TODO: This is to work around server re-ordering
1171       // messages.  We send the candidates once the session-initiate
1172       // is acked.  Once we have fixed the server to guarantee message
1173       // order, we can remove this case.
1174       transproxy->AddUnsentCandidates(candidates);
1175     } else {
1176       if (!transproxy->negotiated()) {
1177         transproxy->AddSentCandidates(candidates);
1178       }
1179       SessionError error;
1180       if (!SendTransportInfoMessage(transproxy, candidates, &error)) {
1181         LOG(LS_ERROR) << "Could not send transport info message: "
1182                       << error.text;
1183         return;
1184       }
1185     }
1186   }
1187 }
1188 
OnTransportSendError(Transport * transport,const buzz::XmlElement * stanza,const buzz::QName & name,const std::string & type,const std::string & text,const buzz::XmlElement * extra_info)1189 void Session::OnTransportSendError(Transport* transport,
1190                                    const buzz::XmlElement* stanza,
1191                                    const buzz::QName& name,
1192                                    const std::string& type,
1193                                    const std::string& text,
1194                                    const buzz::XmlElement* extra_info) {
1195   ASSERT(signaling_thread()->IsCurrent());
1196   SignalErrorMessage(this, stanza, name, type, text, extra_info);
1197 }
1198 
OnIncomingMessage(const SessionMessage & msg)1199 void Session::OnIncomingMessage(const SessionMessage& msg) {
1200   ASSERT(signaling_thread()->IsCurrent());
1201   ASSERT(state() == STATE_INIT || msg.from == remote_name());
1202 
1203   if (current_protocol_== PROTOCOL_HYBRID) {
1204     if (msg.protocol == PROTOCOL_GINGLE) {
1205       current_protocol_ = PROTOCOL_GINGLE;
1206     } else {
1207       current_protocol_ = PROTOCOL_JINGLE;
1208     }
1209   }
1210 
1211   bool valid = false;
1212   MessageError error;
1213   switch (msg.type) {
1214     case ACTION_SESSION_INITIATE:
1215       valid = OnInitiateMessage(msg, &error);
1216       break;
1217     case ACTION_SESSION_INFO:
1218       valid = OnInfoMessage(msg);
1219       break;
1220     case ACTION_SESSION_ACCEPT:
1221       valid = OnAcceptMessage(msg, &error);
1222       break;
1223     case ACTION_SESSION_REJECT:
1224       valid = OnRejectMessage(msg, &error);
1225       break;
1226     case ACTION_SESSION_TERMINATE:
1227       valid = OnTerminateMessage(msg, &error);
1228       break;
1229     case ACTION_TRANSPORT_INFO:
1230       valid = OnTransportInfoMessage(msg, &error);
1231       break;
1232     case ACTION_TRANSPORT_ACCEPT:
1233       valid = OnTransportAcceptMessage(msg, &error);
1234       break;
1235     case ACTION_DESCRIPTION_INFO:
1236       valid = OnDescriptionInfoMessage(msg, &error);
1237       break;
1238     default:
1239       valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST,
1240                          "unknown session message type",
1241                          &error);
1242   }
1243 
1244   if (valid) {
1245     SendAcknowledgementMessage(msg.stanza);
1246   } else {
1247     SignalErrorMessage(this, msg.stanza, error.type,
1248                        "modify", error.text, NULL);
1249   }
1250 }
1251 
OnIncomingResponse(const buzz::XmlElement * orig_stanza,const buzz::XmlElement * response_stanza,const SessionMessage & msg)1252 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza,
1253                                  const buzz::XmlElement* response_stanza,
1254                                  const SessionMessage& msg) {
1255   ASSERT(signaling_thread()->IsCurrent());
1256 
1257   if (msg.type == ACTION_SESSION_INITIATE) {
1258     OnInitiateAcked();
1259   }
1260 }
1261 
OnInitiateAcked()1262 void Session::OnInitiateAcked() {
1263     // TODO: This is to work around server re-ordering
1264     // messages.  We send the candidates once the session-initiate
1265     // is acked.  Once we have fixed the server to guarantee message
1266     // order, we can remove this case.
1267   if (!initiate_acked_) {
1268     initiate_acked_ = true;
1269     SessionError error;
1270     SendAllUnsentTransportInfoMessages(&error);
1271   }
1272 }
1273 
OnFailedSend(const buzz::XmlElement * orig_stanza,const buzz::XmlElement * error_stanza)1274 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza,
1275                            const buzz::XmlElement* error_stanza) {
1276   ASSERT(signaling_thread()->IsCurrent());
1277 
1278   SessionMessage msg;
1279   ParseError parse_error;
1280   if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) {
1281     LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text
1282                   << ":" << orig_stanza;
1283     return;
1284   }
1285 
1286   // If the error is a session redirect, call OnRedirectError, which will
1287   // continue the session with a new remote JID.
1288   SessionRedirect redirect;
1289   if (FindSessionRedirect(error_stanza, &redirect)) {
1290     SessionError error;
1291     if (!OnRedirectError(redirect, &error)) {
1292       // TODO: Should we send a message back?  The standard
1293       // says nothing about it.
1294       std::ostringstream desc;
1295       desc << "Failed to redirect: " << error.text;
1296       LOG(LS_ERROR) << desc.str();
1297       SetError(ERROR_RESPONSE, desc.str());
1298     }
1299     return;
1300   }
1301 
1302   std::string error_type = "cancel";
1303 
1304   const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR);
1305   if (error) {
1306     error_type = error->Attr(buzz::QN_TYPE);
1307 
1308     LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n"
1309                   << "in response to:\n" << orig_stanza->Str();
1310   } else {
1311     // don't crash if <error> is missing
1312     LOG(LS_ERROR) << "Session error without <error/> element, ignoring";
1313     return;
1314   }
1315 
1316   if (msg.type == ACTION_TRANSPORT_INFO) {
1317     // Transport messages frequently generate errors because they are sent right
1318     // when we detect a network failure.  For that reason, we ignore such
1319     // errors, because if we do not establish writability again, we will
1320     // terminate anyway.  The exceptions are transport-specific error tags,
1321     // which we pass on to the respective transport.
1322   } else if ((error_type != "continue") && (error_type != "wait")) {
1323     // We do not set an error if the other side said it is okay to continue
1324     // (possibly after waiting).  These errors can be ignored.
1325     SetError(ERROR_RESPONSE, "");
1326   }
1327 }
1328 
OnInitiateMessage(const SessionMessage & msg,MessageError * error)1329 bool Session::OnInitiateMessage(const SessionMessage& msg,
1330                                 MessageError* error) {
1331   if (!CheckState(STATE_INIT, error))
1332     return false;
1333 
1334   SessionInitiate init;
1335   if (!ParseSessionInitiate(msg.protocol, msg.action_elem,
1336                             GetContentParsers(), GetTransportParsers(),
1337                             GetCandidateTranslators(),
1338                             &init, error))
1339     return false;
1340 
1341   SessionError session_error;
1342   if (!CreateTransportProxies(init.transports, &session_error)) {
1343     return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE,
1344                       session_error.text, error);
1345   }
1346 
1347   set_remote_name(msg.from);
1348   set_initiator_name(msg.initiator);
1349   set_remote_description(new SessionDescription(init.ClearContents(),
1350                                                 init.transports,
1351                                                 init.groups));
1352   // Updating transport with TransportDescription.
1353   PushdownTransportDescription(CS_REMOTE, CA_OFFER, NULL);
1354   SetState(STATE_RECEIVEDINITIATE);
1355 
1356   // Users of Session may listen to state change and call Reject().
1357   if (state() != STATE_SENTREJECT) {
1358     if (!OnRemoteCandidates(init.transports, error))
1359       return false;
1360 
1361     // TODO(juberti): Auto-generate and push down the local transport answer.
1362     // This is necessary for trickling to work with RFC 5245 ICE.
1363   }
1364   return true;
1365 }
1366 
OnAcceptMessage(const SessionMessage & msg,MessageError * error)1367 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) {
1368   if (!CheckState(STATE_SENTINITIATE, error))
1369     return false;
1370 
1371   SessionAccept accept;
1372   if (!ParseSessionAccept(msg.protocol, msg.action_elem,
1373                           GetContentParsers(), GetTransportParsers(),
1374                           GetCandidateTranslators(),
1375                           &accept, error)) {
1376     return false;
1377   }
1378 
1379   // If we get an accept, we can assume the initiate has been
1380   // received, even if we haven't gotten an IQ response.
1381   OnInitiateAcked();
1382 
1383   set_remote_description(new SessionDescription(accept.ClearContents(),
1384                                                 accept.transports,
1385                                                 accept.groups));
1386   // Updating transport with TransportDescription.
1387   PushdownTransportDescription(CS_REMOTE, CA_ANSWER, NULL);
1388   MaybeEnableMuxingSupport();  // Enable transport channel mux if supported.
1389   SetState(STATE_RECEIVEDACCEPT);
1390 
1391   if (!OnRemoteCandidates(accept.transports, error))
1392     return false;
1393 
1394   return true;
1395 }
1396 
OnRejectMessage(const SessionMessage & msg,MessageError * error)1397 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) {
1398   if (!CheckState(STATE_SENTINITIATE, error))
1399     return false;
1400 
1401   SetState(STATE_RECEIVEDREJECT);
1402   return true;
1403 }
1404 
OnInfoMessage(const SessionMessage & msg)1405 bool Session::OnInfoMessage(const SessionMessage& msg) {
1406   SignalInfoMessage(this, msg.action_elem);
1407   return true;
1408 }
1409 
OnTerminateMessage(const SessionMessage & msg,MessageError * error)1410 bool Session::OnTerminateMessage(const SessionMessage& msg,
1411                                  MessageError* error) {
1412   SessionTerminate term;
1413   if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error))
1414     return false;
1415 
1416   SignalReceivedTerminateReason(this, term.reason);
1417   if (term.debug_reason != buzz::STR_EMPTY) {
1418     LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason;
1419   }
1420 
1421   SetState(STATE_RECEIVEDTERMINATE);
1422   return true;
1423 }
1424 
OnTransportInfoMessage(const SessionMessage & msg,MessageError * error)1425 bool Session::OnTransportInfoMessage(const SessionMessage& msg,
1426                                      MessageError* error) {
1427   TransportInfos tinfos;
1428   if (!ParseTransportInfos(msg.protocol, msg.action_elem,
1429                            initiator_description()->contents(),
1430                            GetTransportParsers(), GetCandidateTranslators(),
1431                            &tinfos, error))
1432     return false;
1433 
1434   if (!OnRemoteCandidates(tinfos, error))
1435     return false;
1436 
1437   return true;
1438 }
1439 
OnTransportAcceptMessage(const SessionMessage & msg,MessageError * error)1440 bool Session::OnTransportAcceptMessage(const SessionMessage& msg,
1441                                        MessageError* error) {
1442   // TODO: Currently here only for compatibility with
1443   // Gingle 1.1 clients (notably, Google Voice).
1444   return true;
1445 }
1446 
OnDescriptionInfoMessage(const SessionMessage & msg,MessageError * error)1447 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg,
1448                               MessageError* error) {
1449   if (!CheckState(STATE_INPROGRESS, error))
1450     return false;
1451 
1452   DescriptionInfo description_info;
1453   if (!ParseDescriptionInfo(msg.protocol, msg.action_elem,
1454                             GetContentParsers(), GetTransportParsers(),
1455                             GetCandidateTranslators(),
1456                             &description_info, error)) {
1457     return false;
1458   }
1459 
1460   ContentInfos& updated_contents = description_info.contents;
1461 
1462   // TODO: Currently, reflector sends back
1463   // video stream updates even for an audio-only call, which causes
1464   // this to fail.  Put this back once reflector is fixed.
1465   //
1466   // ContentInfos::iterator it;
1467   // First, ensure all updates are valid before modifying remote_description_.
1468   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1469   //   if (remote_description()->GetContentByName(it->name) == NULL) {
1470   //     return false;
1471   //   }
1472   // }
1473 
1474   // TODO: We used to replace contents from an update, but
1475   // that no longer works with partial updates.  We need to figure out
1476   // a way to merge patial updates into contents.  For now, users of
1477   // Session should listen to SignalRemoteDescriptionUpdate and handle
1478   // updates.  They should not expect remote_description to be the
1479   // latest value.
1480   //
1481   // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1482   //     remote_description()->RemoveContentByName(it->name);
1483   //     remote_description()->AddContent(it->name, it->type, it->description);
1484   //   }
1485   // }
1486 
1487   SignalRemoteDescriptionUpdate(this, updated_contents);
1488   return true;
1489 }
1490 
BareJidsEqual(const std::string & name1,const std::string & name2)1491 bool BareJidsEqual(const std::string& name1,
1492                    const std::string& name2) {
1493   buzz::Jid jid1(name1);
1494   buzz::Jid jid2(name2);
1495 
1496   return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2);
1497 }
1498 
OnRedirectError(const SessionRedirect & redirect,SessionError * error)1499 bool Session::OnRedirectError(const SessionRedirect& redirect,
1500                               SessionError* error) {
1501   MessageError message_error;
1502   if (!CheckState(STATE_SENTINITIATE, &message_error)) {
1503     return BadWrite(message_error.text, error);
1504   }
1505 
1506   if (!BareJidsEqual(remote_name(), redirect.target))
1507     return BadWrite("Redirection not allowed: must be the same bare jid.",
1508                     error);
1509 
1510   // When we receive a redirect, we point the session at the new JID
1511   // and resend the candidates.
1512   set_remote_name(redirect.target);
1513   return (SendInitiateMessage(local_description(), error) &&
1514           ResendAllTransportInfoMessages(error));
1515 }
1516 
CheckState(State expected,MessageError * error)1517 bool Session::CheckState(State expected, MessageError* error) {
1518   if (state() != expected) {
1519     // The server can deliver messages out of order/repeated for various
1520     // reasons. For example, if the server does not recive our iq response,
1521     // it could assume that the iq it sent was lost, and will then send
1522     // it again. Ideally, we should implement reliable messaging with
1523     // duplicate elimination.
1524     return BadMessage(buzz::QN_STANZA_NOT_ALLOWED,
1525                       "message not allowed in current state",
1526                       error);
1527   }
1528   return true;
1529 }
1530 
SetError(Error error,const std::string & error_desc)1531 void Session::SetError(Error error, const std::string& error_desc) {
1532   BaseSession::SetError(error, error_desc);
1533   if (error != ERROR_NONE)
1534     signaling_thread()->Post(this, MSG_ERROR);
1535 }
1536 
OnMessage(talk_base::Message * pmsg)1537 void Session::OnMessage(talk_base::Message* pmsg) {
1538   // preserve this because BaseSession::OnMessage may modify it
1539   State orig_state = state();
1540 
1541   BaseSession::OnMessage(pmsg);
1542 
1543   switch (pmsg->message_id) {
1544   case MSG_ERROR:
1545     TerminateWithReason(STR_TERMINATE_ERROR);
1546     break;
1547 
1548   case MSG_STATE:
1549     switch (orig_state) {
1550     case STATE_SENTREJECT:
1551     case STATE_RECEIVEDREJECT:
1552       // Assume clean termination.
1553       Terminate();
1554       break;
1555 
1556     case STATE_SENTTERMINATE:
1557     case STATE_RECEIVEDTERMINATE:
1558       session_manager_->DestroySession(this);
1559       break;
1560 
1561     default:
1562       // Explicitly ignoring some states here.
1563       break;
1564     }
1565     break;
1566   }
1567 }
1568 
SendInitiateMessage(const SessionDescription * sdesc,SessionError * error)1569 bool Session::SendInitiateMessage(const SessionDescription* sdesc,
1570                                   SessionError* error) {
1571   SessionInitiate init;
1572   init.contents = sdesc->contents();
1573   init.transports = GetEmptyTransportInfos(init.contents);
1574   init.groups = sdesc->groups();
1575   return SendMessage(ACTION_SESSION_INITIATE, init, error);
1576 }
1577 
WriteSessionAction(SignalingProtocol protocol,const SessionInitiate & init,XmlElements * elems,WriteError * error)1578 bool Session::WriteSessionAction(
1579     SignalingProtocol protocol, const SessionInitiate& init,
1580     XmlElements* elems, WriteError* error) {
1581   return WriteSessionInitiate(protocol, init.contents, init.transports,
1582                               GetContentParsers(), GetTransportParsers(),
1583                               GetCandidateTranslators(), init.groups,
1584                               elems, error);
1585 }
1586 
SendAcceptMessage(const SessionDescription * sdesc,SessionError * error)1587 bool Session::SendAcceptMessage(const SessionDescription* sdesc,
1588                                 SessionError* error) {
1589   XmlElements elems;
1590   if (!WriteSessionAccept(current_protocol_,
1591                           sdesc->contents(),
1592                           GetEmptyTransportInfos(sdesc->contents()),
1593                           GetContentParsers(), GetTransportParsers(),
1594                           GetCandidateTranslators(), sdesc->groups(),
1595                           &elems, error)) {
1596     return false;
1597   }
1598   return SendMessage(ACTION_SESSION_ACCEPT, elems, error);
1599 }
1600 
SendRejectMessage(const std::string & reason,SessionError * error)1601 bool Session::SendRejectMessage(const std::string& reason,
1602                                 SessionError* error) {
1603   SessionTerminate term(reason);
1604   return SendMessage(ACTION_SESSION_REJECT, term, error);
1605 }
1606 
SendTerminateMessage(const std::string & reason,SessionError * error)1607 bool Session::SendTerminateMessage(const std::string& reason,
1608                                    SessionError* error) {
1609   SessionTerminate term(reason);
1610   return SendMessage(ACTION_SESSION_TERMINATE, term, error);
1611 }
1612 
WriteSessionAction(SignalingProtocol protocol,const SessionTerminate & term,XmlElements * elems,WriteError * error)1613 bool Session::WriteSessionAction(SignalingProtocol protocol,
1614                                  const SessionTerminate& term,
1615                                  XmlElements* elems, WriteError* error) {
1616   WriteSessionTerminate(protocol, term, elems);
1617   return true;
1618 }
1619 
SendTransportInfoMessage(const TransportInfo & tinfo,SessionError * error)1620 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo,
1621                                        SessionError* error) {
1622   return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error);
1623 }
1624 
SendTransportInfoMessage(const TransportProxy * transproxy,const Candidates & candidates,SessionError * error)1625 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy,
1626                                        const Candidates& candidates,
1627                                        SessionError* error) {
1628   return SendTransportInfoMessage(TransportInfo(transproxy->content_name(),
1629       TransportDescription(transproxy->type(), std::vector<std::string>(),
1630                            std::string(), std::string(), ICEMODE_FULL,
1631                            CONNECTIONROLE_NONE, NULL, candidates)), error);
1632 }
1633 
WriteSessionAction(SignalingProtocol protocol,const TransportInfo & tinfo,XmlElements * elems,WriteError * error)1634 bool Session::WriteSessionAction(SignalingProtocol protocol,
1635                                  const TransportInfo& tinfo,
1636                                  XmlElements* elems, WriteError* error) {
1637   TransportInfos tinfos;
1638   tinfos.push_back(tinfo);
1639   return WriteTransportInfos(protocol, tinfos,
1640                              GetTransportParsers(), GetCandidateTranslators(),
1641                              elems, error);
1642 }
1643 
ResendAllTransportInfoMessages(SessionError * error)1644 bool Session::ResendAllTransportInfoMessages(SessionError* error) {
1645   for (TransportMap::const_iterator iter = transport_proxies().begin();
1646        iter != transport_proxies().end(); ++iter) {
1647     TransportProxy* transproxy = iter->second;
1648     if (transproxy->sent_candidates().size() > 0) {
1649       if (!SendTransportInfoMessage(
1650               transproxy, transproxy->sent_candidates(), error)) {
1651         LOG(LS_ERROR) << "Could not resend transport info messages: "
1652                       << error->text;
1653         return false;
1654       }
1655       transproxy->ClearSentCandidates();
1656     }
1657   }
1658   return true;
1659 }
1660 
SendAllUnsentTransportInfoMessages(SessionError * error)1661 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) {
1662   for (TransportMap::const_iterator iter = transport_proxies().begin();
1663        iter != transport_proxies().end(); ++iter) {
1664     TransportProxy* transproxy = iter->second;
1665     if (transproxy->unsent_candidates().size() > 0) {
1666       if (!SendTransportInfoMessage(
1667               transproxy, transproxy->unsent_candidates(), error)) {
1668         LOG(LS_ERROR) << "Could not send unsent transport info messages: "
1669                       << error->text;
1670         return false;
1671       }
1672       transproxy->ClearUnsentCandidates();
1673     }
1674   }
1675   return true;
1676 }
1677 
SendMessage(ActionType type,const XmlElements & action_elems,SessionError * error)1678 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1679                           SessionError* error) {
1680     return SendMessage(type, action_elems, remote_name(), error);
1681 }
1682 
SendMessage(ActionType type,const XmlElements & action_elems,const std::string & remote_name,SessionError * error)1683 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1684                           const std::string& remote_name, SessionError* error) {
1685   talk_base::scoped_ptr<buzz::XmlElement> stanza(
1686       new buzz::XmlElement(buzz::QN_IQ));
1687 
1688   SessionMessage msg(current_protocol_, type, id(), initiator_name());
1689   msg.to = remote_name;
1690   WriteSessionMessage(msg, action_elems, stanza.get());
1691 
1692   SignalOutgoingMessage(this, stanza.get());
1693   return true;
1694 }
1695 
1696 template <typename Action>
SendMessage(ActionType type,const Action & action,SessionError * error)1697 bool Session::SendMessage(ActionType type, const Action& action,
1698                           SessionError* error) {
1699   talk_base::scoped_ptr<buzz::XmlElement> stanza(
1700       new buzz::XmlElement(buzz::QN_IQ));
1701   if (!WriteActionMessage(type, action, stanza.get(), error))
1702     return false;
1703 
1704   SignalOutgoingMessage(this, stanza.get());
1705   return true;
1706 }
1707 
1708 template <typename Action>
WriteActionMessage(ActionType type,const Action & action,buzz::XmlElement * stanza,WriteError * error)1709 bool Session::WriteActionMessage(ActionType type, const Action& action,
1710                                  buzz::XmlElement* stanza,
1711                                  WriteError* error) {
1712   if (current_protocol_ == PROTOCOL_HYBRID) {
1713     if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error))
1714       return false;
1715     if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error))
1716       return false;
1717   } else {
1718     if (!WriteActionMessage(current_protocol_, type, action, stanza, error))
1719       return false;
1720   }
1721   return true;
1722 }
1723 
1724 template <typename Action>
WriteActionMessage(SignalingProtocol protocol,ActionType type,const Action & action,buzz::XmlElement * stanza,WriteError * error)1725 bool Session::WriteActionMessage(SignalingProtocol protocol,
1726                                  ActionType type, const Action& action,
1727                                  buzz::XmlElement* stanza, WriteError* error) {
1728   XmlElements action_elems;
1729   if (!WriteSessionAction(protocol, action, &action_elems, error))
1730     return false;
1731 
1732   SessionMessage msg(protocol, type, id(), initiator_name());
1733   msg.to = remote_name();
1734 
1735   WriteSessionMessage(msg, action_elems, stanza);
1736   return true;
1737 }
1738 
SendAcknowledgementMessage(const buzz::XmlElement * stanza)1739 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) {
1740   talk_base::scoped_ptr<buzz::XmlElement> ack(
1741       new buzz::XmlElement(buzz::QN_IQ));
1742   ack->SetAttr(buzz::QN_TO, remote_name());
1743   ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID));
1744   ack->SetAttr(buzz::QN_TYPE, "result");
1745 
1746   SignalOutgoingMessage(this, ack.get());
1747 }
1748 
1749 }  // namespace cricket
1750