1 /*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include "talk/p2p/base/session.h"
29
30 #include "talk/p2p/base/dtlstransport.h"
31 #include "talk/p2p/base/p2ptransport.h"
32 #include "talk/p2p/base/sessionclient.h"
33 #include "talk/p2p/base/transport.h"
34 #include "talk/p2p/base/transportchannelproxy.h"
35 #include "talk/p2p/base/transportinfo.h"
36 #include "talk/xmpp/constants.h"
37 #include "talk/xmpp/jid.h"
38 #include "webrtc/base/bind.h"
39 #include "webrtc/base/common.h"
40 #include "webrtc/base/helpers.h"
41 #include "webrtc/base/logging.h"
42 #include "webrtc/base/scoped_ptr.h"
43 #include "webrtc/base/sslstreamadapter.h"
44
45 #include "talk/p2p/base/constants.h"
46
47 namespace cricket {
48
49 using rtc::Bind;
50
BadMessage(const buzz::QName type,const std::string & text,MessageError * err)51 bool BadMessage(const buzz::QName type,
52 const std::string& text,
53 MessageError* err) {
54 err->SetType(type);
55 err->SetText(text);
56 return false;
57 }
58
~TransportProxy()59 TransportProxy::~TransportProxy() {
60 for (ChannelMap::iterator iter = channels_.begin();
61 iter != channels_.end(); ++iter) {
62 iter->second->SignalDestroyed(iter->second);
63 delete iter->second;
64 }
65 }
66
type() const67 const std::string& TransportProxy::type() const {
68 return transport_->get()->type();
69 }
70
GetChannel(int component)71 TransportChannel* TransportProxy::GetChannel(int component) {
72 ASSERT(rtc::Thread::Current() == worker_thread_);
73 return GetChannelProxy(component);
74 }
75
CreateChannel(const std::string & name,int component)76 TransportChannel* TransportProxy::CreateChannel(
77 const std::string& name, int component) {
78 ASSERT(rtc::Thread::Current() == worker_thread_);
79 ASSERT(GetChannel(component) == NULL);
80 ASSERT(!transport_->get()->HasChannel(component));
81
82 // We always create a proxy in case we need to change out the transport later.
83 TransportChannelProxy* channel =
84 new TransportChannelProxy(content_name(), name, component);
85 channels_[component] = channel;
86
87 // If we're already negotiated, create an impl and hook it up to the proxy
88 // channel. If we're connecting, create an impl but don't hook it up yet.
89 if (negotiated_) {
90 SetupChannelProxy_w(component, channel);
91 } else if (connecting_) {
92 GetOrCreateChannelProxyImpl_w(component);
93 }
94 return channel;
95 }
96
HasChannel(int component)97 bool TransportProxy::HasChannel(int component) {
98 return transport_->get()->HasChannel(component);
99 }
100
DestroyChannel(int component)101 void TransportProxy::DestroyChannel(int component) {
102 ASSERT(rtc::Thread::Current() == worker_thread_);
103 TransportChannel* channel = GetChannel(component);
104 if (channel) {
105 // If the state of TransportProxy is not NEGOTIATED
106 // then TransportChannelProxy and its impl are not
107 // connected. Both must be connected before
108 // deletion.
109 if (!negotiated_) {
110 SetupChannelProxy_w(component, GetChannelProxy(component));
111 }
112
113 channels_.erase(component);
114 channel->SignalDestroyed(channel);
115 delete channel;
116 }
117 }
118
ConnectChannels()119 void TransportProxy::ConnectChannels() {
120 if (!connecting_) {
121 if (!negotiated_) {
122 for (ChannelMap::iterator iter = channels_.begin();
123 iter != channels_.end(); ++iter) {
124 GetOrCreateChannelProxyImpl(iter->first);
125 }
126 }
127 connecting_ = true;
128 }
129 // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
130 // don't have any channels yet, so we need to allow this method to be called
131 // multiple times. Once we fix Transport, we can move this call inside the
132 // if (!connecting_) block.
133 transport_->get()->ConnectChannels();
134 }
135
CompleteNegotiation()136 void TransportProxy::CompleteNegotiation() {
137 if (!negotiated_) {
138 for (ChannelMap::iterator iter = channels_.begin();
139 iter != channels_.end(); ++iter) {
140 SetupChannelProxy(iter->first, iter->second);
141 }
142 negotiated_ = true;
143 }
144 }
145
AddSentCandidates(const Candidates & candidates)146 void TransportProxy::AddSentCandidates(const Candidates& candidates) {
147 for (Candidates::const_iterator cand = candidates.begin();
148 cand != candidates.end(); ++cand) {
149 sent_candidates_.push_back(*cand);
150 }
151 }
152
AddUnsentCandidates(const Candidates & candidates)153 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
154 for (Candidates::const_iterator cand = candidates.begin();
155 cand != candidates.end(); ++cand) {
156 unsent_candidates_.push_back(*cand);
157 }
158 }
159
GetChannelNameFromComponent(int component,std::string * channel_name) const160 bool TransportProxy::GetChannelNameFromComponent(
161 int component, std::string* channel_name) const {
162 const TransportChannelProxy* channel = GetChannelProxy(component);
163 if (channel == NULL) {
164 return false;
165 }
166
167 *channel_name = channel->name();
168 return true;
169 }
170
GetComponentFromChannelName(const std::string & channel_name,int * component) const171 bool TransportProxy::GetComponentFromChannelName(
172 const std::string& channel_name, int* component) const {
173 const TransportChannelProxy* channel = GetChannelProxyByName(channel_name);
174 if (channel == NULL) {
175 return false;
176 }
177
178 *component = channel->component();
179 return true;
180 }
181
GetChannelProxy(int component) const182 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
183 ChannelMap::const_iterator iter = channels_.find(component);
184 return (iter != channels_.end()) ? iter->second : NULL;
185 }
186
GetChannelProxyByName(const std::string & name) const187 TransportChannelProxy* TransportProxy::GetChannelProxyByName(
188 const std::string& name) const {
189 for (ChannelMap::const_iterator iter = channels_.begin();
190 iter != channels_.end();
191 ++iter) {
192 if (iter->second->name() == name) {
193 return iter->second;
194 }
195 }
196 return NULL;
197 }
198
GetOrCreateChannelProxyImpl(int component)199 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl(
200 int component) {
201 return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
202 &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component));
203 }
204
GetOrCreateChannelProxyImpl_w(int component)205 TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w(
206 int component) {
207 ASSERT(rtc::Thread::Current() == worker_thread_);
208 TransportChannelImpl* impl = transport_->get()->GetChannel(component);
209 if (impl == NULL) {
210 impl = transport_->get()->CreateChannel(component);
211 }
212 return impl;
213 }
214
SetupChannelProxy(int component,TransportChannelProxy * transproxy)215 void TransportProxy::SetupChannelProxy(
216 int component, TransportChannelProxy* transproxy) {
217 worker_thread_->Invoke<void>(Bind(
218 &TransportProxy::SetupChannelProxy_w, this, component, transproxy));
219 }
220
SetupChannelProxy_w(int component,TransportChannelProxy * transproxy)221 void TransportProxy::SetupChannelProxy_w(
222 int component, TransportChannelProxy* transproxy) {
223 ASSERT(rtc::Thread::Current() == worker_thread_);
224 TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component);
225 ASSERT(impl != NULL);
226 transproxy->SetImplementation(impl);
227 }
228
ReplaceChannelProxyImpl(TransportChannelProxy * proxy,TransportChannelImpl * impl)229 void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy,
230 TransportChannelImpl* impl) {
231 worker_thread_->Invoke<void>(Bind(
232 &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl));
233 }
234
ReplaceChannelProxyImpl_w(TransportChannelProxy * proxy,TransportChannelImpl * impl)235 void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy,
236 TransportChannelImpl* impl) {
237 ASSERT(rtc::Thread::Current() == worker_thread_);
238 ASSERT(proxy != NULL);
239 proxy->SetImplementation(impl);
240 }
241
242 // This function muxes |this| onto |target| by repointing |this| at
243 // |target|'s transport and setting our TransportChannelProxies
244 // to point to |target|'s underlying implementations.
SetupMux(TransportProxy * target)245 bool TransportProxy::SetupMux(TransportProxy* target) {
246 // Bail out if there's nothing to do.
247 if (transport_ == target->transport_) {
248 return true;
249 }
250
251 // Run through all channels and remove any non-rtp transport channels before
252 // setting target transport channels.
253 for (ChannelMap::const_iterator iter = channels_.begin();
254 iter != channels_.end(); ++iter) {
255 if (!target->transport_->get()->HasChannel(iter->first)) {
256 // Remove if channel doesn't exist in |transport_|.
257 ReplaceChannelProxyImpl(iter->second, NULL);
258 } else {
259 // Replace the impl for all the TransportProxyChannels with the channels
260 // from |target|'s transport. Fail if there's not an exact match.
261 ReplaceChannelProxyImpl(
262 iter->second, target->transport_->get()->CreateChannel(iter->first));
263 }
264 }
265
266 // Now replace our transport. Must happen afterwards because
267 // it deletes all impls as a side effect.
268 transport_ = target->transport_;
269 transport_->get()->SignalCandidatesReady.connect(
270 this, &TransportProxy::OnTransportCandidatesReady);
271 set_candidates_allocated(target->candidates_allocated());
272 return true;
273 }
274
SetIceRole(IceRole role)275 void TransportProxy::SetIceRole(IceRole role) {
276 transport_->get()->SetIceRole(role);
277 }
278
SetLocalTransportDescription(const TransportDescription & description,ContentAction action,std::string * error_desc)279 bool TransportProxy::SetLocalTransportDescription(
280 const TransportDescription& description,
281 ContentAction action,
282 std::string* error_desc) {
283 // If this is an answer, finalize the negotiation.
284 if (action == CA_ANSWER) {
285 CompleteNegotiation();
286 }
287 bool result = transport_->get()->SetLocalTransportDescription(description,
288 action,
289 error_desc);
290 if (result)
291 local_description_set_ = true;
292 return result;
293 }
294
SetRemoteTransportDescription(const TransportDescription & description,ContentAction action,std::string * error_desc)295 bool TransportProxy::SetRemoteTransportDescription(
296 const TransportDescription& description,
297 ContentAction action,
298 std::string* error_desc) {
299 // If this is an answer, finalize the negotiation.
300 if (action == CA_ANSWER) {
301 CompleteNegotiation();
302 }
303 bool result = transport_->get()->SetRemoteTransportDescription(description,
304 action,
305 error_desc);
306 if (result)
307 remote_description_set_ = true;
308 return result;
309 }
310
OnSignalingReady()311 void TransportProxy::OnSignalingReady() {
312 // If we're starting a new allocation sequence, reset our state.
313 set_candidates_allocated(false);
314 transport_->get()->OnSignalingReady();
315 }
316
OnRemoteCandidates(const Candidates & candidates,std::string * error)317 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
318 std::string* error) {
319 // Ensure the transport is negotiated before handling candidates.
320 // TODO(juberti): Remove this once everybody calls SetLocalTD.
321 CompleteNegotiation();
322
323 // Verify each candidate before passing down to transport layer.
324 for (Candidates::const_iterator cand = candidates.begin();
325 cand != candidates.end(); ++cand) {
326 if (!transport_->get()->VerifyCandidate(*cand, error))
327 return false;
328 if (!HasChannel(cand->component())) {
329 *error = "Candidate has unknown component: " + cand->ToString() +
330 " for content: " + content_name_;
331 return false;
332 }
333 }
334 transport_->get()->OnRemoteCandidates(candidates);
335 return true;
336 }
337
SetIdentity(rtc::SSLIdentity * identity)338 void TransportProxy::SetIdentity(
339 rtc::SSLIdentity* identity) {
340 transport_->get()->SetIdentity(identity);
341 }
342
StateToString(State state)343 std::string BaseSession::StateToString(State state) {
344 switch (state) {
345 case Session::STATE_INIT:
346 return "STATE_INIT";
347 case Session::STATE_SENTINITIATE:
348 return "STATE_SENTINITIATE";
349 case Session::STATE_RECEIVEDINITIATE:
350 return "STATE_RECEIVEDINITIATE";
351 case Session::STATE_SENTPRACCEPT:
352 return "STATE_SENTPRACCEPT";
353 case Session::STATE_SENTACCEPT:
354 return "STATE_SENTACCEPT";
355 case Session::STATE_RECEIVEDPRACCEPT:
356 return "STATE_RECEIVEDPRACCEPT";
357 case Session::STATE_RECEIVEDACCEPT:
358 return "STATE_RECEIVEDACCEPT";
359 case Session::STATE_SENTMODIFY:
360 return "STATE_SENTMODIFY";
361 case Session::STATE_RECEIVEDMODIFY:
362 return "STATE_RECEIVEDMODIFY";
363 case Session::STATE_SENTREJECT:
364 return "STATE_SENTREJECT";
365 case Session::STATE_RECEIVEDREJECT:
366 return "STATE_RECEIVEDREJECT";
367 case Session::STATE_SENTREDIRECT:
368 return "STATE_SENTREDIRECT";
369 case Session::STATE_SENTTERMINATE:
370 return "STATE_SENTTERMINATE";
371 case Session::STATE_RECEIVEDTERMINATE:
372 return "STATE_RECEIVEDTERMINATE";
373 case Session::STATE_INPROGRESS:
374 return "STATE_INPROGRESS";
375 case Session::STATE_DEINIT:
376 return "STATE_DEINIT";
377 default:
378 break;
379 }
380 return "STATE_" + rtc::ToString(state);
381 }
382
BaseSession(rtc::Thread * signaling_thread,rtc::Thread * worker_thread,PortAllocator * port_allocator,const std::string & sid,const std::string & content_type,bool initiator)383 BaseSession::BaseSession(rtc::Thread* signaling_thread,
384 rtc::Thread* worker_thread,
385 PortAllocator* port_allocator,
386 const std::string& sid,
387 const std::string& content_type,
388 bool initiator)
389 : state_(STATE_INIT),
390 error_(ERROR_NONE),
391 signaling_thread_(signaling_thread),
392 worker_thread_(worker_thread),
393 port_allocator_(port_allocator),
394 sid_(sid),
395 content_type_(content_type),
396 transport_type_(NS_GINGLE_P2P),
397 initiator_(initiator),
398 identity_(NULL),
399 ice_tiebreaker_(rtc::CreateRandomId64()),
400 role_switch_(false) {
401 ASSERT(signaling_thread->IsCurrent());
402 }
403
~BaseSession()404 BaseSession::~BaseSession() {
405 ASSERT(signaling_thread()->IsCurrent());
406
407 ASSERT(state_ != STATE_DEINIT);
408 LogState(state_, STATE_DEINIT);
409 state_ = STATE_DEINIT;
410 SignalState(this, state_);
411
412 for (TransportMap::iterator iter = transports_.begin();
413 iter != transports_.end(); ++iter) {
414 delete iter->second;
415 }
416 }
417
local_description() const418 const SessionDescription* BaseSession::local_description() const {
419 // TODO(tommi): Assert on thread correctness.
420 return local_description_.get();
421 }
422
remote_description() const423 const SessionDescription* BaseSession::remote_description() const {
424 // TODO(tommi): Assert on thread correctness.
425 return remote_description_.get();
426 }
427
remote_description()428 SessionDescription* BaseSession::remote_description() {
429 // TODO(tommi): Assert on thread correctness.
430 return remote_description_.get();
431 }
432
set_local_description(const SessionDescription * sdesc)433 void BaseSession::set_local_description(const SessionDescription* sdesc) {
434 // TODO(tommi): Assert on thread correctness.
435 if (sdesc != local_description_.get())
436 local_description_.reset(sdesc);
437 }
438
set_remote_description(SessionDescription * sdesc)439 void BaseSession::set_remote_description(SessionDescription* sdesc) {
440 // TODO(tommi): Assert on thread correctness.
441 if (sdesc != remote_description_)
442 remote_description_.reset(sdesc);
443 }
444
initiator_description() const445 const SessionDescription* BaseSession::initiator_description() const {
446 // TODO(tommi): Assert on thread correctness.
447 return initiator_ ? local_description_.get() : remote_description_.get();
448 }
449
SetIdentity(rtc::SSLIdentity * identity)450 bool BaseSession::SetIdentity(rtc::SSLIdentity* identity) {
451 if (identity_)
452 return false;
453 identity_ = identity;
454 for (TransportMap::iterator iter = transports_.begin();
455 iter != transports_.end(); ++iter) {
456 iter->second->SetIdentity(identity_);
457 }
458 return true;
459 }
460
PushdownTransportDescription(ContentSource source,ContentAction action,std::string * error_desc)461 bool BaseSession::PushdownTransportDescription(ContentSource source,
462 ContentAction action,
463 std::string* error_desc) {
464 if (source == CS_LOCAL) {
465 return PushdownLocalTransportDescription(local_description(),
466 action,
467 error_desc);
468 }
469 return PushdownRemoteTransportDescription(remote_description(),
470 action,
471 error_desc);
472 }
473
PushdownLocalTransportDescription(const SessionDescription * sdesc,ContentAction action,std::string * error_desc)474 bool BaseSession::PushdownLocalTransportDescription(
475 const SessionDescription* sdesc,
476 ContentAction action,
477 std::string* error_desc) {
478 // Update the Transports with the right information, and trigger them to
479 // start connecting.
480 for (TransportMap::iterator iter = transports_.begin();
481 iter != transports_.end(); ++iter) {
482 // If no transport info was in this session description, ret == false
483 // and we just skip this one.
484 TransportDescription tdesc;
485 bool ret = GetTransportDescription(
486 sdesc, iter->second->content_name(), &tdesc);
487 if (ret) {
488 if (!iter->second->SetLocalTransportDescription(tdesc, action,
489 error_desc)) {
490 return false;
491 }
492
493 iter->second->ConnectChannels();
494 }
495 }
496
497 return true;
498 }
499
PushdownRemoteTransportDescription(const SessionDescription * sdesc,ContentAction action,std::string * error_desc)500 bool BaseSession::PushdownRemoteTransportDescription(
501 const SessionDescription* sdesc,
502 ContentAction action,
503 std::string* error_desc) {
504 // Update the Transports with the right information.
505 for (TransportMap::iterator iter = transports_.begin();
506 iter != transports_.end(); ++iter) {
507 TransportDescription tdesc;
508
509 // If no transport info was in this session description, ret == false
510 // and we just skip this one.
511 bool ret = GetTransportDescription(
512 sdesc, iter->second->content_name(), &tdesc);
513 if (ret) {
514 if (!iter->second->SetRemoteTransportDescription(tdesc, action,
515 error_desc)) {
516 return false;
517 }
518 }
519 }
520
521 return true;
522 }
523
CreateChannel(const std::string & content_name,const std::string & channel_name,int component)524 TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
525 const std::string& channel_name,
526 int component) {
527 // We create the proxy "on demand" here because we need to support
528 // creating channels at any time, even before we send or receive
529 // initiate messages, which is before we create the transports.
530 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
531 return transproxy->CreateChannel(channel_name, component);
532 }
533
GetChannel(const std::string & content_name,int component)534 TransportChannel* BaseSession::GetChannel(const std::string& content_name,
535 int component) {
536 TransportProxy* transproxy = GetTransportProxy(content_name);
537 if (transproxy == NULL)
538 return NULL;
539
540 return transproxy->GetChannel(component);
541 }
542
DestroyChannel(const std::string & content_name,int component)543 void BaseSession::DestroyChannel(const std::string& content_name,
544 int component) {
545 TransportProxy* transproxy = GetTransportProxy(content_name);
546 ASSERT(transproxy != NULL);
547 transproxy->DestroyChannel(component);
548 }
549
GetOrCreateTransportProxy(const std::string & content_name)550 TransportProxy* BaseSession::GetOrCreateTransportProxy(
551 const std::string& content_name) {
552 TransportProxy* transproxy = GetTransportProxy(content_name);
553 if (transproxy)
554 return transproxy;
555
556 Transport* transport = CreateTransport(content_name);
557 transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED);
558 transport->SetIceTiebreaker(ice_tiebreaker_);
559 // TODO: Connect all the Transport signals to TransportProxy
560 // then to the BaseSession.
561 transport->SignalConnecting.connect(
562 this, &BaseSession::OnTransportConnecting);
563 transport->SignalWritableState.connect(
564 this, &BaseSession::OnTransportWritable);
565 transport->SignalRequestSignaling.connect(
566 this, &BaseSession::OnTransportRequestSignaling);
567 transport->SignalTransportError.connect(
568 this, &BaseSession::OnTransportSendError);
569 transport->SignalRouteChange.connect(
570 this, &BaseSession::OnTransportRouteChange);
571 transport->SignalCandidatesAllocationDone.connect(
572 this, &BaseSession::OnTransportCandidatesAllocationDone);
573 transport->SignalRoleConflict.connect(
574 this, &BaseSession::OnRoleConflict);
575 transport->SignalCompleted.connect(
576 this, &BaseSession::OnTransportCompleted);
577 transport->SignalFailed.connect(
578 this, &BaseSession::OnTransportFailed);
579
580 transproxy = new TransportProxy(worker_thread_, sid_, content_name,
581 new TransportWrapper(transport));
582 transproxy->SignalCandidatesReady.connect(
583 this, &BaseSession::OnTransportProxyCandidatesReady);
584 if (identity_)
585 transproxy->SetIdentity(identity_);
586 transports_[content_name] = transproxy;
587
588 return transproxy;
589 }
590
GetTransport(const std::string & content_name)591 Transport* BaseSession::GetTransport(const std::string& content_name) {
592 TransportProxy* transproxy = GetTransportProxy(content_name);
593 if (transproxy == NULL)
594 return NULL;
595 return transproxy->impl();
596 }
597
GetTransportProxy(const std::string & content_name)598 TransportProxy* BaseSession::GetTransportProxy(
599 const std::string& content_name) {
600 TransportMap::iterator iter = transports_.find(content_name);
601 return (iter != transports_.end()) ? iter->second : NULL;
602 }
603
GetTransportProxy(const Transport * transport)604 TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) {
605 for (TransportMap::iterator iter = transports_.begin();
606 iter != transports_.end(); ++iter) {
607 TransportProxy* transproxy = iter->second;
608 if (transproxy->impl() == transport) {
609 return transproxy;
610 }
611 }
612 return NULL;
613 }
614
GetFirstTransportProxy()615 TransportProxy* BaseSession::GetFirstTransportProxy() {
616 if (transports_.empty())
617 return NULL;
618 return transports_.begin()->second;
619 }
620
DestroyTransportProxy(const std::string & content_name)621 void BaseSession::DestroyTransportProxy(
622 const std::string& content_name) {
623 TransportMap::iterator iter = transports_.find(content_name);
624 if (iter != transports_.end()) {
625 delete iter->second;
626 transports_.erase(content_name);
627 }
628 }
629
CreateTransport(const std::string & content_name)630 cricket::Transport* BaseSession::CreateTransport(
631 const std::string& content_name) {
632 ASSERT(transport_type_ == NS_GINGLE_P2P);
633 return new cricket::DtlsTransport<P2PTransport>(
634 signaling_thread(), worker_thread(), content_name,
635 port_allocator(), identity_);
636 }
637
GetStats(SessionStats * stats)638 bool BaseSession::GetStats(SessionStats* stats) {
639 for (TransportMap::iterator iter = transports_.begin();
640 iter != transports_.end(); ++iter) {
641 std::string proxy_id = iter->second->content_name();
642 // We are ignoring not-yet-instantiated transports.
643 if (iter->second->impl()) {
644 std::string transport_id = iter->second->impl()->content_name();
645 stats->proxy_to_transport[proxy_id] = transport_id;
646 if (stats->transport_stats.find(transport_id)
647 == stats->transport_stats.end()) {
648 TransportStats subinfos;
649 if (!iter->second->impl()->GetStats(&subinfos)) {
650 return false;
651 }
652 stats->transport_stats[transport_id] = subinfos;
653 }
654 }
655 }
656 return true;
657 }
658
SetState(State state)659 void BaseSession::SetState(State state) {
660 ASSERT(signaling_thread_->IsCurrent());
661 if (state != state_) {
662 LogState(state_, state);
663 state_ = state;
664 SignalState(this, state_);
665 signaling_thread_->Post(this, MSG_STATE);
666 }
667 SignalNewDescription();
668 }
669
SetError(Error error,const std::string & error_desc)670 void BaseSession::SetError(Error error, const std::string& error_desc) {
671 ASSERT(signaling_thread_->IsCurrent());
672 if (error != error_) {
673 error_ = error;
674 error_desc_ = error_desc;
675 SignalError(this, error);
676 }
677 }
678
OnSignalingReady()679 void BaseSession::OnSignalingReady() {
680 ASSERT(signaling_thread()->IsCurrent());
681 for (TransportMap::iterator iter = transports_.begin();
682 iter != transports_.end(); ++iter) {
683 iter->second->OnSignalingReady();
684 }
685 }
686
687 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to
688 // start, remove this method once everyone calls PushdownLocalTD.
SpeculativelyConnectAllTransportChannels()689 void BaseSession::SpeculativelyConnectAllTransportChannels() {
690 // Put all transports into the connecting state.
691 for (TransportMap::iterator iter = transports_.begin();
692 iter != transports_.end(); ++iter) {
693 iter->second->ConnectChannels();
694 }
695 }
696
OnRemoteCandidates(const std::string & content_name,const Candidates & candidates,std::string * error)697 bool BaseSession::OnRemoteCandidates(const std::string& content_name,
698 const Candidates& candidates,
699 std::string* error) {
700 // Give candidates to the appropriate transport, and tell that transport
701 // to start connecting, if it's not already doing so.
702 TransportProxy* transproxy = GetTransportProxy(content_name);
703 if (!transproxy) {
704 *error = "Unknown content name " + content_name;
705 return false;
706 }
707 if (!transproxy->OnRemoteCandidates(candidates, error)) {
708 return false;
709 }
710 // TODO(juberti): Remove this call once we can be sure that we always have
711 // a local transport description (which will trigger the connection).
712 transproxy->ConnectChannels();
713 return true;
714 }
715
MaybeEnableMuxingSupport()716 bool BaseSession::MaybeEnableMuxingSupport() {
717 // We need both a local and remote description to decide if we should mux.
718 if ((state_ == STATE_SENTINITIATE ||
719 state_ == STATE_RECEIVEDINITIATE) &&
720 ((local_description_ == NULL) ||
721 (remote_description_ == NULL))) {
722 return false;
723 }
724
725 // In order to perform the multiplexing, we need all proxies to be in the
726 // negotiated state, i.e. to have implementations underneath.
727 // Ensure that this is the case, regardless of whether we are going to mux.
728 for (TransportMap::iterator iter = transports_.begin();
729 iter != transports_.end(); ++iter) {
730 ASSERT(iter->second->negotiated());
731 if (!iter->second->negotiated())
732 return false;
733 }
734
735 // If both sides agree to BUNDLE, mux all the specified contents onto the
736 // transport belonging to the first content name in the BUNDLE group.
737 // If the contents are already muxed, this will be a no-op.
738 // TODO(juberti): Should this check that local and remote have configured
739 // BUNDLE the same way?
740 bool candidates_allocated = IsCandidateAllocationDone();
741 const ContentGroup* local_bundle_group =
742 local_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
743 const ContentGroup* remote_bundle_group =
744 remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
745 if (local_bundle_group && remote_bundle_group &&
746 local_bundle_group->FirstContentName()) {
747 const std::string* content_name = local_bundle_group->FirstContentName();
748 const ContentInfo* content =
749 local_description_->GetContentByName(*content_name);
750 ASSERT(content != NULL);
751 if (!SetSelectedProxy(content->name, local_bundle_group)) {
752 LOG(LS_WARNING) << "Failed to set up BUNDLE";
753 return false;
754 }
755
756 // If we weren't done gathering before, we might be done now, as a result
757 // of enabling mux.
758 LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: "
759 << *content_name;
760 if (!candidates_allocated) {
761 MaybeCandidateAllocationDone();
762 }
763 } else {
764 LOG(LS_INFO) << "No BUNDLE information, not bundling.";
765 }
766 return true;
767 }
768
SetSelectedProxy(const std::string & content_name,const ContentGroup * muxed_group)769 bool BaseSession::SetSelectedProxy(const std::string& content_name,
770 const ContentGroup* muxed_group) {
771 TransportProxy* selected_proxy = GetTransportProxy(content_name);
772 if (!selected_proxy) {
773 return false;
774 }
775
776 ASSERT(selected_proxy->negotiated());
777 for (TransportMap::iterator iter = transports_.begin();
778 iter != transports_.end(); ++iter) {
779 // If content is part of the mux group, then repoint its proxy at the
780 // transport object that we have chosen to mux onto. If the proxy
781 // is already pointing at the right object, it will be a no-op.
782 if (muxed_group->HasContentName(iter->first) &&
783 !iter->second->SetupMux(selected_proxy)) {
784 return false;
785 }
786 }
787 return true;
788 }
789
OnTransportCandidatesAllocationDone(Transport * transport)790 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
791 // TODO(juberti): This is a clunky way of processing the done signal. Instead,
792 // TransportProxy should receive the done signal directly, set its allocated
793 // flag internally, and then reissue the done signal to Session.
794 // Overall we should make TransportProxy receive *all* the signals from
795 // Transport, since this removes the need to manually iterate over all
796 // the transports, as is needed to make sure signals are handled properly
797 // when BUNDLEing.
798 // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways
799 // that make it prohibitively difficult to run dbg builds. Disabled for now.
800 //ASSERT(!IsCandidateAllocationDone());
801 for (TransportMap::iterator iter = transports_.begin();
802 iter != transports_.end(); ++iter) {
803 if (iter->second->impl() == transport) {
804 iter->second->set_candidates_allocated(true);
805 }
806 }
807 MaybeCandidateAllocationDone();
808 }
809
IsCandidateAllocationDone() const810 bool BaseSession::IsCandidateAllocationDone() const {
811 for (TransportMap::const_iterator iter = transports_.begin();
812 iter != transports_.end(); ++iter) {
813 if (!iter->second->candidates_allocated())
814 return false;
815 }
816 return true;
817 }
818
MaybeCandidateAllocationDone()819 void BaseSession::MaybeCandidateAllocationDone() {
820 if (IsCandidateAllocationDone()) {
821 LOG(LS_INFO) << "Candidate gathering is complete.";
822 OnCandidatesAllocationDone();
823 }
824 }
825
OnRoleConflict()826 void BaseSession::OnRoleConflict() {
827 if (role_switch_) {
828 LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
829 return;
830 }
831
832 role_switch_ = true;
833 for (TransportMap::iterator iter = transports_.begin();
834 iter != transports_.end(); ++iter) {
835 // Role will be reverse of initial role setting.
836 IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING;
837 iter->second->SetIceRole(role);
838 }
839 }
840
LogState(State old_state,State new_state)841 void BaseSession::LogState(State old_state, State new_state) {
842 LOG(LS_INFO) << "Session:" << id()
843 << " Old state:" << StateToString(old_state)
844 << " New state:" << StateToString(new_state)
845 << " Type:" << content_type()
846 << " Transport:" << transport_type();
847 }
848
849 // static
GetTransportDescription(const SessionDescription * description,const std::string & content_name,TransportDescription * tdesc)850 bool BaseSession::GetTransportDescription(const SessionDescription* description,
851 const std::string& content_name,
852 TransportDescription* tdesc) {
853 if (!description || !tdesc) {
854 return false;
855 }
856 const TransportInfo* transport_info =
857 description->GetTransportInfoByName(content_name);
858 if (!transport_info) {
859 return false;
860 }
861 *tdesc = transport_info->description;
862 return true;
863 }
864
SignalNewDescription()865 void BaseSession::SignalNewDescription() {
866 ContentAction action;
867 ContentSource source;
868 if (!GetContentAction(&action, &source)) {
869 return;
870 }
871 if (source == CS_LOCAL) {
872 SignalNewLocalDescription(this, action);
873 } else {
874 SignalNewRemoteDescription(this, action);
875 }
876 }
877
GetContentAction(ContentAction * action,ContentSource * source)878 bool BaseSession::GetContentAction(ContentAction* action,
879 ContentSource* source) {
880 switch (state_) {
881 // new local description
882 case STATE_SENTINITIATE:
883 *action = CA_OFFER;
884 *source = CS_LOCAL;
885 break;
886 case STATE_SENTPRACCEPT:
887 *action = CA_PRANSWER;
888 *source = CS_LOCAL;
889 break;
890 case STATE_SENTACCEPT:
891 *action = CA_ANSWER;
892 *source = CS_LOCAL;
893 break;
894 // new remote description
895 case STATE_RECEIVEDINITIATE:
896 *action = CA_OFFER;
897 *source = CS_REMOTE;
898 break;
899 case STATE_RECEIVEDPRACCEPT:
900 *action = CA_PRANSWER;
901 *source = CS_REMOTE;
902 break;
903 case STATE_RECEIVEDACCEPT:
904 *action = CA_ANSWER;
905 *source = CS_REMOTE;
906 break;
907 default:
908 return false;
909 }
910 return true;
911 }
912
OnMessage(rtc::Message * pmsg)913 void BaseSession::OnMessage(rtc::Message *pmsg) {
914 switch (pmsg->message_id) {
915 case MSG_TIMEOUT:
916 // Session timeout has occured.
917 SetError(ERROR_TIME, "Session timeout has occured.");
918 break;
919
920 case MSG_STATE:
921 switch (state_) {
922 case STATE_SENTACCEPT:
923 case STATE_RECEIVEDACCEPT:
924 SetState(STATE_INPROGRESS);
925 break;
926
927 default:
928 // Explicitly ignoring some states here.
929 break;
930 }
931 break;
932 }
933 }
934
Session(SessionManager * session_manager,const std::string & local_name,const std::string & initiator_name,const std::string & sid,const std::string & content_type,SessionClient * client)935 Session::Session(SessionManager* session_manager,
936 const std::string& local_name,
937 const std::string& initiator_name,
938 const std::string& sid,
939 const std::string& content_type,
940 SessionClient* client)
941 : BaseSession(session_manager->signaling_thread(),
942 session_manager->worker_thread(),
943 session_manager->port_allocator(),
944 sid, content_type, initiator_name == local_name) {
945 ASSERT(client != NULL);
946 session_manager_ = session_manager;
947 local_name_ = local_name;
948 initiator_name_ = initiator_name;
949 transport_parser_ = new P2PTransportParser();
950 client_ = client;
951 initiate_acked_ = false;
952 current_protocol_ = PROTOCOL_HYBRID;
953 }
954
~Session()955 Session::~Session() {
956 delete transport_parser_;
957 }
958
Initiate(const std::string & to,const SessionDescription * sdesc)959 bool Session::Initiate(const std::string& to,
960 const SessionDescription* sdesc) {
961 ASSERT(signaling_thread()->IsCurrent());
962 SessionError error;
963
964 // Only from STATE_INIT
965 if (state() != STATE_INIT)
966 return false;
967
968 // Setup for signaling.
969 set_remote_name(to);
970 set_local_description(sdesc);
971 if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()),
972 &error)) {
973 LOG(LS_ERROR) << "Could not create transports: " << error.text;
974 return false;
975 }
976
977 if (!SendInitiateMessage(sdesc, &error)) {
978 LOG(LS_ERROR) << "Could not send initiate message: " << error.text;
979 return false;
980 }
981
982 // We need to connect transport proxy and impl here so that we can process
983 // the TransportDescriptions.
984 SpeculativelyConnectAllTransportChannels();
985
986 PushdownTransportDescription(CS_LOCAL, CA_OFFER, NULL);
987 SetState(Session::STATE_SENTINITIATE);
988 return true;
989 }
990
Accept(const SessionDescription * sdesc)991 bool Session::Accept(const SessionDescription* sdesc) {
992 ASSERT(signaling_thread()->IsCurrent());
993
994 // Only if just received initiate
995 if (state() != STATE_RECEIVEDINITIATE)
996 return false;
997
998 // Setup for signaling.
999 set_local_description(sdesc);
1000
1001 SessionError error;
1002 if (!SendAcceptMessage(sdesc, &error)) {
1003 LOG(LS_ERROR) << "Could not send accept message: " << error.text;
1004 return false;
1005 }
1006 // TODO(juberti): Add BUNDLE support to transport-info messages.
1007 PushdownTransportDescription(CS_LOCAL, CA_ANSWER, NULL);
1008 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported.
1009 SetState(Session::STATE_SENTACCEPT);
1010 return true;
1011 }
1012
Reject(const std::string & reason)1013 bool Session::Reject(const std::string& reason) {
1014 ASSERT(signaling_thread()->IsCurrent());
1015
1016 // Reject is sent in response to an initiate or modify, to reject the
1017 // request
1018 if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY)
1019 return false;
1020
1021 SessionError error;
1022 if (!SendRejectMessage(reason, &error)) {
1023 LOG(LS_ERROR) << "Could not send reject message: " << error.text;
1024 return false;
1025 }
1026
1027 SetState(STATE_SENTREJECT);
1028 return true;
1029 }
1030
TerminateWithReason(const std::string & reason)1031 bool Session::TerminateWithReason(const std::string& reason) {
1032 ASSERT(signaling_thread()->IsCurrent());
1033
1034 // Either side can terminate, at any time.
1035 switch (state()) {
1036 case STATE_SENTTERMINATE:
1037 case STATE_RECEIVEDTERMINATE:
1038 return false;
1039
1040 case STATE_SENTREJECT:
1041 case STATE_RECEIVEDREJECT:
1042 // We don't need to send terminate if we sent or received a reject...
1043 // it's implicit.
1044 break;
1045
1046 default:
1047 SessionError error;
1048 if (!SendTerminateMessage(reason, &error)) {
1049 LOG(LS_ERROR) << "Could not send terminate message: " << error.text;
1050 return false;
1051 }
1052 break;
1053 }
1054
1055 SetState(STATE_SENTTERMINATE);
1056 return true;
1057 }
1058
SendInfoMessage(const XmlElements & elems,const std::string & remote_name)1059 bool Session::SendInfoMessage(const XmlElements& elems,
1060 const std::string& remote_name) {
1061 ASSERT(signaling_thread()->IsCurrent());
1062 SessionError error;
1063 if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) {
1064 LOG(LS_ERROR) << "Could not send info message " << error.text;
1065 return false;
1066 }
1067 return true;
1068 }
1069
SendDescriptionInfoMessage(const ContentInfos & contents)1070 bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) {
1071 XmlElements elems;
1072 WriteError write_error;
1073 if (!WriteDescriptionInfo(current_protocol_,
1074 contents,
1075 GetContentParsers(),
1076 &elems, &write_error)) {
1077 LOG(LS_ERROR) << "Could not write description info message: "
1078 << write_error.text;
1079 return false;
1080 }
1081 SessionError error;
1082 if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) {
1083 LOG(LS_ERROR) << "Could not send description info message: "
1084 << error.text;
1085 return false;
1086 }
1087 return true;
1088 }
1089
GetEmptyTransportInfos(const ContentInfos & contents) const1090 TransportInfos Session::GetEmptyTransportInfos(
1091 const ContentInfos& contents) const {
1092 TransportInfos tinfos;
1093 for (ContentInfos::const_iterator content = contents.begin();
1094 content != contents.end(); ++content) {
1095 tinfos.push_back(TransportInfo(content->name,
1096 TransportDescription(transport_type(),
1097 std::string(),
1098 std::string())));
1099 }
1100 return tinfos;
1101 }
1102
OnRemoteCandidates(const TransportInfos & tinfos,ParseError * error)1103 bool Session::OnRemoteCandidates(
1104 const TransportInfos& tinfos, ParseError* error) {
1105 for (TransportInfos::const_iterator tinfo = tinfos.begin();
1106 tinfo != tinfos.end(); ++tinfo) {
1107 std::string str_error;
1108 if (!BaseSession::OnRemoteCandidates(
1109 tinfo->content_name, tinfo->description.candidates, &str_error)) {
1110 return BadParse(str_error, error);
1111 }
1112 }
1113 return true;
1114 }
1115
CreateTransportProxies(const TransportInfos & tinfos,SessionError * error)1116 bool Session::CreateTransportProxies(const TransportInfos& tinfos,
1117 SessionError* error) {
1118 for (TransportInfos::const_iterator tinfo = tinfos.begin();
1119 tinfo != tinfos.end(); ++tinfo) {
1120 if (tinfo->description.transport_type != transport_type()) {
1121 error->SetText("No supported transport in offer.");
1122 return false;
1123 }
1124
1125 GetOrCreateTransportProxy(tinfo->content_name);
1126 }
1127 return true;
1128 }
1129
GetTransportParsers()1130 TransportParserMap Session::GetTransportParsers() {
1131 TransportParserMap parsers;
1132 parsers[transport_type()] = transport_parser_;
1133 return parsers;
1134 }
1135
GetCandidateTranslators()1136 CandidateTranslatorMap Session::GetCandidateTranslators() {
1137 CandidateTranslatorMap translators;
1138 // NOTE: This technique makes it impossible to parse G-ICE
1139 // candidates in session-initiate messages because the channels
1140 // aren't yet created at that point. Since we don't use candidates
1141 // in session-initiate messages, we should be OK. Once we switch to
1142 // ICE, this translation shouldn't be necessary.
1143 for (TransportMap::const_iterator iter = transport_proxies().begin();
1144 iter != transport_proxies().end(); ++iter) {
1145 translators[iter->first] = iter->second;
1146 }
1147 return translators;
1148 }
1149
GetContentParsers()1150 ContentParserMap Session::GetContentParsers() {
1151 ContentParserMap parsers;
1152 parsers[content_type()] = client_;
1153 // We need to be able parse both RTP-based and SCTP-based Jingle
1154 // with the same client.
1155 if (content_type() == NS_JINGLE_RTP) {
1156 parsers[NS_JINGLE_DRAFT_SCTP] = client_;
1157 }
1158 return parsers;
1159 }
1160
OnTransportRequestSignaling(Transport * transport)1161 void Session::OnTransportRequestSignaling(Transport* transport) {
1162 ASSERT(signaling_thread()->IsCurrent());
1163 TransportProxy* transproxy = GetTransportProxy(transport);
1164 ASSERT(transproxy != NULL);
1165 if (transproxy) {
1166 // Reset candidate allocation status for the transport proxy.
1167 transproxy->set_candidates_allocated(false);
1168 }
1169 SignalRequestSignaling(this);
1170 }
1171
OnTransportConnecting(Transport * transport)1172 void Session::OnTransportConnecting(Transport* transport) {
1173 // This is an indication that we should begin watching the writability
1174 // state of the transport.
1175 OnTransportWritable(transport);
1176 }
1177
OnTransportWritable(Transport * transport)1178 void Session::OnTransportWritable(Transport* transport) {
1179 ASSERT(signaling_thread()->IsCurrent());
1180
1181 // If the transport is not writable, start a timer to make sure that it
1182 // becomes writable within a reasonable amount of time. If it does not, we
1183 // terminate since we can't actually send data. If the transport is writable,
1184 // cancel the timer. Note that writability transitions may occur repeatedly
1185 // during the lifetime of the session.
1186 signaling_thread()->Clear(this, MSG_TIMEOUT);
1187 if (transport->HasChannels() && !transport->writable()) {
1188 signaling_thread()->PostDelayed(
1189 session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT);
1190 }
1191 }
1192
OnTransportProxyCandidatesReady(TransportProxy * transproxy,const Candidates & candidates)1193 void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy,
1194 const Candidates& candidates) {
1195 ASSERT(signaling_thread()->IsCurrent());
1196 if (transproxy != NULL) {
1197 if (initiator() && !initiate_acked_) {
1198 // TODO: This is to work around server re-ordering
1199 // messages. We send the candidates once the session-initiate
1200 // is acked. Once we have fixed the server to guarantee message
1201 // order, we can remove this case.
1202 transproxy->AddUnsentCandidates(candidates);
1203 } else {
1204 if (!transproxy->negotiated()) {
1205 transproxy->AddSentCandidates(candidates);
1206 }
1207 SessionError error;
1208 if (!SendTransportInfoMessage(transproxy, candidates, &error)) {
1209 LOG(LS_ERROR) << "Could not send transport info message: "
1210 << error.text;
1211 return;
1212 }
1213 }
1214 }
1215 }
1216
OnTransportSendError(Transport * transport,const buzz::XmlElement * stanza,const buzz::QName & name,const std::string & type,const std::string & text,const buzz::XmlElement * extra_info)1217 void Session::OnTransportSendError(Transport* transport,
1218 const buzz::XmlElement* stanza,
1219 const buzz::QName& name,
1220 const std::string& type,
1221 const std::string& text,
1222 const buzz::XmlElement* extra_info) {
1223 ASSERT(signaling_thread()->IsCurrent());
1224 SignalErrorMessage(this, stanza, name, type, text, extra_info);
1225 }
1226
OnIncomingMessage(const SessionMessage & msg)1227 void Session::OnIncomingMessage(const SessionMessage& msg) {
1228 ASSERT(signaling_thread()->IsCurrent());
1229 ASSERT(state() == STATE_INIT || msg.from == remote_name());
1230
1231 if (current_protocol_== PROTOCOL_HYBRID) {
1232 if (msg.protocol == PROTOCOL_GINGLE) {
1233 current_protocol_ = PROTOCOL_GINGLE;
1234 } else {
1235 current_protocol_ = PROTOCOL_JINGLE;
1236 }
1237 }
1238
1239 bool valid = false;
1240 MessageError error;
1241 switch (msg.type) {
1242 case ACTION_SESSION_INITIATE:
1243 valid = OnInitiateMessage(msg, &error);
1244 break;
1245 case ACTION_SESSION_INFO:
1246 valid = OnInfoMessage(msg);
1247 break;
1248 case ACTION_SESSION_ACCEPT:
1249 valid = OnAcceptMessage(msg, &error);
1250 break;
1251 case ACTION_SESSION_REJECT:
1252 valid = OnRejectMessage(msg, &error);
1253 break;
1254 case ACTION_SESSION_TERMINATE:
1255 valid = OnTerminateMessage(msg, &error);
1256 break;
1257 case ACTION_TRANSPORT_INFO:
1258 valid = OnTransportInfoMessage(msg, &error);
1259 break;
1260 case ACTION_TRANSPORT_ACCEPT:
1261 valid = OnTransportAcceptMessage(msg, &error);
1262 break;
1263 case ACTION_DESCRIPTION_INFO:
1264 valid = OnDescriptionInfoMessage(msg, &error);
1265 break;
1266 default:
1267 valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST,
1268 "unknown session message type",
1269 &error);
1270 }
1271
1272 if (valid) {
1273 SendAcknowledgementMessage(msg.stanza);
1274 } else {
1275 SignalErrorMessage(this, msg.stanza, error.type,
1276 "modify", error.text, NULL);
1277 }
1278 }
1279
OnIncomingResponse(const buzz::XmlElement * orig_stanza,const buzz::XmlElement * response_stanza,const SessionMessage & msg)1280 void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza,
1281 const buzz::XmlElement* response_stanza,
1282 const SessionMessage& msg) {
1283 ASSERT(signaling_thread()->IsCurrent());
1284
1285 if (msg.type == ACTION_SESSION_INITIATE) {
1286 OnInitiateAcked();
1287 }
1288 }
1289
OnInitiateAcked()1290 void Session::OnInitiateAcked() {
1291 // TODO: This is to work around server re-ordering
1292 // messages. We send the candidates once the session-initiate
1293 // is acked. Once we have fixed the server to guarantee message
1294 // order, we can remove this case.
1295 if (!initiate_acked_) {
1296 initiate_acked_ = true;
1297 SessionError error;
1298 SendAllUnsentTransportInfoMessages(&error);
1299 }
1300 }
1301
OnFailedSend(const buzz::XmlElement * orig_stanza,const buzz::XmlElement * error_stanza)1302 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza,
1303 const buzz::XmlElement* error_stanza) {
1304 ASSERT(signaling_thread()->IsCurrent());
1305
1306 SessionMessage msg;
1307 ParseError parse_error;
1308 if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) {
1309 LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text
1310 << ":" << orig_stanza;
1311 return;
1312 }
1313
1314 // If the error is a session redirect, call OnRedirectError, which will
1315 // continue the session with a new remote JID.
1316 SessionRedirect redirect;
1317 if (FindSessionRedirect(error_stanza, &redirect)) {
1318 SessionError error;
1319 if (!OnRedirectError(redirect, &error)) {
1320 // TODO: Should we send a message back? The standard
1321 // says nothing about it.
1322 std::ostringstream desc;
1323 desc << "Failed to redirect: " << error.text;
1324 LOG(LS_ERROR) << desc.str();
1325 SetError(ERROR_RESPONSE, desc.str());
1326 }
1327 return;
1328 }
1329
1330 std::string error_type = "cancel";
1331
1332 const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR);
1333 if (error) {
1334 error_type = error->Attr(buzz::QN_TYPE);
1335
1336 LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n"
1337 << "in response to:\n" << orig_stanza->Str();
1338 } else {
1339 // don't crash if <error> is missing
1340 LOG(LS_ERROR) << "Session error without <error/> element, ignoring";
1341 return;
1342 }
1343
1344 if (msg.type == ACTION_TRANSPORT_INFO) {
1345 // Transport messages frequently generate errors because they are sent right
1346 // when we detect a network failure. For that reason, we ignore such
1347 // errors, because if we do not establish writability again, we will
1348 // terminate anyway. The exceptions are transport-specific error tags,
1349 // which we pass on to the respective transport.
1350 } else if ((error_type != "continue") && (error_type != "wait")) {
1351 // We do not set an error if the other side said it is okay to continue
1352 // (possibly after waiting). These errors can be ignored.
1353 SetError(ERROR_RESPONSE, "");
1354 }
1355 }
1356
OnInitiateMessage(const SessionMessage & msg,MessageError * error)1357 bool Session::OnInitiateMessage(const SessionMessage& msg,
1358 MessageError* error) {
1359 if (!CheckState(STATE_INIT, error))
1360 return false;
1361
1362 SessionInitiate init;
1363 if (!ParseSessionInitiate(msg.protocol, msg.action_elem,
1364 GetContentParsers(), GetTransportParsers(),
1365 GetCandidateTranslators(),
1366 &init, error))
1367 return false;
1368
1369 SessionError session_error;
1370 if (!CreateTransportProxies(init.transports, &session_error)) {
1371 return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE,
1372 session_error.text, error);
1373 }
1374
1375 set_remote_name(msg.from);
1376 set_initiator_name(msg.initiator);
1377 set_remote_description(new SessionDescription(init.ClearContents(),
1378 init.transports,
1379 init.groups));
1380 // Updating transport with TransportDescription.
1381 PushdownTransportDescription(CS_REMOTE, CA_OFFER, NULL);
1382 SetState(STATE_RECEIVEDINITIATE);
1383
1384 // Users of Session may listen to state change and call Reject().
1385 if (state() != STATE_SENTREJECT) {
1386 if (!OnRemoteCandidates(init.transports, error))
1387 return false;
1388
1389 // TODO(juberti): Auto-generate and push down the local transport answer.
1390 // This is necessary for trickling to work with RFC 5245 ICE.
1391 }
1392 return true;
1393 }
1394
OnAcceptMessage(const SessionMessage & msg,MessageError * error)1395 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) {
1396 if (!CheckState(STATE_SENTINITIATE, error))
1397 return false;
1398
1399 SessionAccept accept;
1400 if (!ParseSessionAccept(msg.protocol, msg.action_elem,
1401 GetContentParsers(), GetTransportParsers(),
1402 GetCandidateTranslators(),
1403 &accept, error)) {
1404 return false;
1405 }
1406
1407 // If we get an accept, we can assume the initiate has been
1408 // received, even if we haven't gotten an IQ response.
1409 OnInitiateAcked();
1410
1411 set_remote_description(new SessionDescription(accept.ClearContents(),
1412 accept.transports,
1413 accept.groups));
1414 // Updating transport with TransportDescription.
1415 PushdownTransportDescription(CS_REMOTE, CA_ANSWER, NULL);
1416 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported.
1417 SetState(STATE_RECEIVEDACCEPT);
1418
1419 if (!OnRemoteCandidates(accept.transports, error))
1420 return false;
1421
1422 return true;
1423 }
1424
OnRejectMessage(const SessionMessage & msg,MessageError * error)1425 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) {
1426 if (!CheckState(STATE_SENTINITIATE, error))
1427 return false;
1428
1429 SetState(STATE_RECEIVEDREJECT);
1430 return true;
1431 }
1432
OnInfoMessage(const SessionMessage & msg)1433 bool Session::OnInfoMessage(const SessionMessage& msg) {
1434 SignalInfoMessage(this, msg.action_elem);
1435 return true;
1436 }
1437
OnTerminateMessage(const SessionMessage & msg,MessageError * error)1438 bool Session::OnTerminateMessage(const SessionMessage& msg,
1439 MessageError* error) {
1440 SessionTerminate term;
1441 if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error))
1442 return false;
1443
1444 SignalReceivedTerminateReason(this, term.reason);
1445 if (term.debug_reason != buzz::STR_EMPTY) {
1446 LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason;
1447 }
1448
1449 SetState(STATE_RECEIVEDTERMINATE);
1450 return true;
1451 }
1452
OnTransportInfoMessage(const SessionMessage & msg,MessageError * error)1453 bool Session::OnTransportInfoMessage(const SessionMessage& msg,
1454 MessageError* error) {
1455 TransportInfos tinfos;
1456 if (!ParseTransportInfos(msg.protocol, msg.action_elem,
1457 initiator_description()->contents(),
1458 GetTransportParsers(), GetCandidateTranslators(),
1459 &tinfos, error))
1460 return false;
1461
1462 if (!OnRemoteCandidates(tinfos, error))
1463 return false;
1464
1465 return true;
1466 }
1467
OnTransportAcceptMessage(const SessionMessage & msg,MessageError * error)1468 bool Session::OnTransportAcceptMessage(const SessionMessage& msg,
1469 MessageError* error) {
1470 // TODO: Currently here only for compatibility with
1471 // Gingle 1.1 clients (notably, Google Voice).
1472 return true;
1473 }
1474
OnDescriptionInfoMessage(const SessionMessage & msg,MessageError * error)1475 bool Session::OnDescriptionInfoMessage(const SessionMessage& msg,
1476 MessageError* error) {
1477 if (!CheckState(STATE_INPROGRESS, error))
1478 return false;
1479
1480 DescriptionInfo description_info;
1481 if (!ParseDescriptionInfo(msg.protocol, msg.action_elem,
1482 GetContentParsers(), GetTransportParsers(),
1483 GetCandidateTranslators(),
1484 &description_info, error)) {
1485 return false;
1486 }
1487
1488 ContentInfos& updated_contents = description_info.contents;
1489
1490 // TODO: Currently, reflector sends back
1491 // video stream updates even for an audio-only call, which causes
1492 // this to fail. Put this back once reflector is fixed.
1493 //
1494 // ContentInfos::iterator it;
1495 // First, ensure all updates are valid before modifying remote_description_.
1496 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1497 // if (remote_description()->GetContentByName(it->name) == NULL) {
1498 // return false;
1499 // }
1500 // }
1501
1502 // TODO: We used to replace contents from an update, but
1503 // that no longer works with partial updates. We need to figure out
1504 // a way to merge patial updates into contents. For now, users of
1505 // Session should listen to SignalRemoteDescriptionUpdate and handle
1506 // updates. They should not expect remote_description to be the
1507 // latest value.
1508 //
1509 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1510 // remote_description()->RemoveContentByName(it->name);
1511 // remote_description()->AddContent(it->name, it->type, it->description);
1512 // }
1513 // }
1514
1515 SignalRemoteDescriptionUpdate(this, updated_contents);
1516 return true;
1517 }
1518
BareJidsEqual(const std::string & name1,const std::string & name2)1519 bool BareJidsEqual(const std::string& name1,
1520 const std::string& name2) {
1521 buzz::Jid jid1(name1);
1522 buzz::Jid jid2(name2);
1523
1524 return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2);
1525 }
1526
OnRedirectError(const SessionRedirect & redirect,SessionError * error)1527 bool Session::OnRedirectError(const SessionRedirect& redirect,
1528 SessionError* error) {
1529 MessageError message_error;
1530 if (!CheckState(STATE_SENTINITIATE, &message_error)) {
1531 return BadWrite(message_error.text, error);
1532 }
1533
1534 if (!BareJidsEqual(remote_name(), redirect.target))
1535 return BadWrite("Redirection not allowed: must be the same bare jid.",
1536 error);
1537
1538 // When we receive a redirect, we point the session at the new JID
1539 // and resend the candidates.
1540 set_remote_name(redirect.target);
1541 return (SendInitiateMessage(local_description(), error) &&
1542 ResendAllTransportInfoMessages(error));
1543 }
1544
CheckState(State expected,MessageError * error)1545 bool Session::CheckState(State expected, MessageError* error) {
1546 if (state() != expected) {
1547 // The server can deliver messages out of order/repeated for various
1548 // reasons. For example, if the server does not recive our iq response,
1549 // it could assume that the iq it sent was lost, and will then send
1550 // it again. Ideally, we should implement reliable messaging with
1551 // duplicate elimination.
1552 return BadMessage(buzz::QN_STANZA_NOT_ALLOWED,
1553 "message not allowed in current state",
1554 error);
1555 }
1556 return true;
1557 }
1558
SetError(Error error,const std::string & error_desc)1559 void Session::SetError(Error error, const std::string& error_desc) {
1560 BaseSession::SetError(error, error_desc);
1561 if (error != ERROR_NONE)
1562 signaling_thread()->Post(this, MSG_ERROR);
1563 }
1564
OnMessage(rtc::Message * pmsg)1565 void Session::OnMessage(rtc::Message* pmsg) {
1566 // preserve this because BaseSession::OnMessage may modify it
1567 State orig_state = state();
1568
1569 BaseSession::OnMessage(pmsg);
1570
1571 switch (pmsg->message_id) {
1572 case MSG_ERROR:
1573 TerminateWithReason(STR_TERMINATE_ERROR);
1574 break;
1575
1576 case MSG_STATE:
1577 switch (orig_state) {
1578 case STATE_SENTREJECT:
1579 case STATE_RECEIVEDREJECT:
1580 // Assume clean termination.
1581 Terminate();
1582 break;
1583
1584 case STATE_SENTTERMINATE:
1585 case STATE_RECEIVEDTERMINATE:
1586 session_manager_->DestroySession(this);
1587 break;
1588
1589 default:
1590 // Explicitly ignoring some states here.
1591 break;
1592 }
1593 break;
1594 }
1595 }
1596
SendInitiateMessage(const SessionDescription * sdesc,SessionError * error)1597 bool Session::SendInitiateMessage(const SessionDescription* sdesc,
1598 SessionError* error) {
1599 SessionInitiate init;
1600 init.contents = sdesc->contents();
1601 init.transports = GetEmptyTransportInfos(init.contents);
1602 init.groups = sdesc->groups();
1603 return SendMessage(ACTION_SESSION_INITIATE, init, error);
1604 }
1605
WriteSessionAction(SignalingProtocol protocol,const SessionInitiate & init,XmlElements * elems,WriteError * error)1606 bool Session::WriteSessionAction(
1607 SignalingProtocol protocol, const SessionInitiate& init,
1608 XmlElements* elems, WriteError* error) {
1609 return WriteSessionInitiate(protocol, init.contents, init.transports,
1610 GetContentParsers(), GetTransportParsers(),
1611 GetCandidateTranslators(), init.groups,
1612 elems, error);
1613 }
1614
SendAcceptMessage(const SessionDescription * sdesc,SessionError * error)1615 bool Session::SendAcceptMessage(const SessionDescription* sdesc,
1616 SessionError* error) {
1617 XmlElements elems;
1618 if (!WriteSessionAccept(current_protocol_,
1619 sdesc->contents(),
1620 GetEmptyTransportInfos(sdesc->contents()),
1621 GetContentParsers(), GetTransportParsers(),
1622 GetCandidateTranslators(), sdesc->groups(),
1623 &elems, error)) {
1624 return false;
1625 }
1626 return SendMessage(ACTION_SESSION_ACCEPT, elems, error);
1627 }
1628
SendRejectMessage(const std::string & reason,SessionError * error)1629 bool Session::SendRejectMessage(const std::string& reason,
1630 SessionError* error) {
1631 SessionTerminate term(reason);
1632 return SendMessage(ACTION_SESSION_REJECT, term, error);
1633 }
1634
SendTerminateMessage(const std::string & reason,SessionError * error)1635 bool Session::SendTerminateMessage(const std::string& reason,
1636 SessionError* error) {
1637 SessionTerminate term(reason);
1638 return SendMessage(ACTION_SESSION_TERMINATE, term, error);
1639 }
1640
WriteSessionAction(SignalingProtocol protocol,const SessionTerminate & term,XmlElements * elems,WriteError * error)1641 bool Session::WriteSessionAction(SignalingProtocol protocol,
1642 const SessionTerminate& term,
1643 XmlElements* elems, WriteError* error) {
1644 WriteSessionTerminate(protocol, term, elems);
1645 return true;
1646 }
1647
SendTransportInfoMessage(const TransportInfo & tinfo,SessionError * error)1648 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo,
1649 SessionError* error) {
1650 return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error);
1651 }
1652
SendTransportInfoMessage(const TransportProxy * transproxy,const Candidates & candidates,SessionError * error)1653 bool Session::SendTransportInfoMessage(const TransportProxy* transproxy,
1654 const Candidates& candidates,
1655 SessionError* error) {
1656 return SendTransportInfoMessage(TransportInfo(transproxy->content_name(),
1657 TransportDescription(transproxy->type(), std::vector<std::string>(),
1658 std::string(), std::string(), ICEMODE_FULL,
1659 CONNECTIONROLE_NONE, NULL, candidates)), error);
1660 }
1661
WriteSessionAction(SignalingProtocol protocol,const TransportInfo & tinfo,XmlElements * elems,WriteError * error)1662 bool Session::WriteSessionAction(SignalingProtocol protocol,
1663 const TransportInfo& tinfo,
1664 XmlElements* elems, WriteError* error) {
1665 TransportInfos tinfos;
1666 tinfos.push_back(tinfo);
1667 return WriteTransportInfos(protocol, tinfos,
1668 GetTransportParsers(), GetCandidateTranslators(),
1669 elems, error);
1670 }
1671
ResendAllTransportInfoMessages(SessionError * error)1672 bool Session::ResendAllTransportInfoMessages(SessionError* error) {
1673 for (TransportMap::const_iterator iter = transport_proxies().begin();
1674 iter != transport_proxies().end(); ++iter) {
1675 TransportProxy* transproxy = iter->second;
1676 if (transproxy->sent_candidates().size() > 0) {
1677 if (!SendTransportInfoMessage(
1678 transproxy, transproxy->sent_candidates(), error)) {
1679 LOG(LS_ERROR) << "Could not resend transport info messages: "
1680 << error->text;
1681 return false;
1682 }
1683 transproxy->ClearSentCandidates();
1684 }
1685 }
1686 return true;
1687 }
1688
SendAllUnsentTransportInfoMessages(SessionError * error)1689 bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) {
1690 for (TransportMap::const_iterator iter = transport_proxies().begin();
1691 iter != transport_proxies().end(); ++iter) {
1692 TransportProxy* transproxy = iter->second;
1693 if (transproxy->unsent_candidates().size() > 0) {
1694 if (!SendTransportInfoMessage(
1695 transproxy, transproxy->unsent_candidates(), error)) {
1696 LOG(LS_ERROR) << "Could not send unsent transport info messages: "
1697 << error->text;
1698 return false;
1699 }
1700 transproxy->ClearUnsentCandidates();
1701 }
1702 }
1703 return true;
1704 }
1705
SendMessage(ActionType type,const XmlElements & action_elems,SessionError * error)1706 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1707 SessionError* error) {
1708 return SendMessage(type, action_elems, remote_name(), error);
1709 }
1710
SendMessage(ActionType type,const XmlElements & action_elems,const std::string & remote_name,SessionError * error)1711 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1712 const std::string& remote_name, SessionError* error) {
1713 rtc::scoped_ptr<buzz::XmlElement> stanza(
1714 new buzz::XmlElement(buzz::QN_IQ));
1715
1716 SessionMessage msg(current_protocol_, type, id(), initiator_name());
1717 msg.to = remote_name;
1718 WriteSessionMessage(msg, action_elems, stanza.get());
1719
1720 SignalOutgoingMessage(this, stanza.get());
1721 return true;
1722 }
1723
1724 template <typename Action>
SendMessage(ActionType type,const Action & action,SessionError * error)1725 bool Session::SendMessage(ActionType type, const Action& action,
1726 SessionError* error) {
1727 rtc::scoped_ptr<buzz::XmlElement> stanza(
1728 new buzz::XmlElement(buzz::QN_IQ));
1729 if (!WriteActionMessage(type, action, stanza.get(), error))
1730 return false;
1731
1732 SignalOutgoingMessage(this, stanza.get());
1733 return true;
1734 }
1735
1736 template <typename Action>
WriteActionMessage(ActionType type,const Action & action,buzz::XmlElement * stanza,WriteError * error)1737 bool Session::WriteActionMessage(ActionType type, const Action& action,
1738 buzz::XmlElement* stanza,
1739 WriteError* error) {
1740 if (current_protocol_ == PROTOCOL_HYBRID) {
1741 if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error))
1742 return false;
1743 if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error))
1744 return false;
1745 } else {
1746 if (!WriteActionMessage(current_protocol_, type, action, stanza, error))
1747 return false;
1748 }
1749 return true;
1750 }
1751
1752 template <typename Action>
WriteActionMessage(SignalingProtocol protocol,ActionType type,const Action & action,buzz::XmlElement * stanza,WriteError * error)1753 bool Session::WriteActionMessage(SignalingProtocol protocol,
1754 ActionType type, const Action& action,
1755 buzz::XmlElement* stanza, WriteError* error) {
1756 XmlElements action_elems;
1757 if (!WriteSessionAction(protocol, action, &action_elems, error))
1758 return false;
1759
1760 SessionMessage msg(protocol, type, id(), initiator_name());
1761 msg.to = remote_name();
1762
1763 WriteSessionMessage(msg, action_elems, stanza);
1764 return true;
1765 }
1766
SendAcknowledgementMessage(const buzz::XmlElement * stanza)1767 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) {
1768 rtc::scoped_ptr<buzz::XmlElement> ack(
1769 new buzz::XmlElement(buzz::QN_IQ));
1770 ack->SetAttr(buzz::QN_TO, remote_name());
1771 ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID));
1772 ack->SetAttr(buzz::QN_TYPE, "result");
1773
1774 SignalOutgoingMessage(this, ack.get());
1775 }
1776
1777 } // namespace cricket
1778