1 /*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include "talk/p2p/base/session.h"
29 #include "talk/base/common.h"
30 #include "talk/base/logging.h"
31 #include "talk/base/helpers.h"
32 #include "talk/base/scoped_ptr.h"
33 #include "talk/xmpp/constants.h"
34 #include "talk/xmpp/jid.h"
35 #include "talk/p2p/base/sessionclient.h"
36 #include "talk/p2p/base/transport.h"
37 #include "talk/p2p/base/transportchannelproxy.h"
38 #include "talk/p2p/base/p2ptransport.h"
39 #include "talk/p2p/base/p2ptransportchannel.h"
40
41 #include "talk/p2p/base/constants.h"
42
43 namespace {
44
45 const uint32 MSG_TIMEOUT = 1;
46 const uint32 MSG_ERROR = 2;
47 const uint32 MSG_STATE = 3;
48
49 } // namespace
50
51 namespace cricket {
52
BadMessage(const buzz::QName type,const std::string & text,MessageError * err)53 bool BadMessage(const buzz::QName type,
54 const std::string& text,
55 MessageError* err) {
56 err->SetType(type);
57 err->SetText(text);
58 return false;
59 }
60
~TransportProxy()61 TransportProxy::~TransportProxy() {
62 for (ChannelMap::iterator iter = channels_.begin();
63 iter != channels_.end(); ++iter) {
64 iter->second->SignalDestroyed(iter->second);
65 delete iter->second;
66 }
67 delete transport_;
68 }
69
type() const70 std::string TransportProxy::type() const {
71 return transport_->type();
72 }
73
GetChannel(const std::string & name)74 TransportChannel* TransportProxy::GetChannel(const std::string& name) {
75 return GetProxy(name);
76 }
77
CreateChannel(const std::string & name,const std::string & content_type)78 TransportChannel* TransportProxy::CreateChannel(
79 const std::string& name, const std::string& content_type) {
80 ASSERT(GetChannel(name) == NULL);
81 ASSERT(!transport_->HasChannel(name));
82
83 // We always create a proxy in case we need to change out the transport later.
84 TransportChannelProxy* channel =
85 new TransportChannelProxy(name, content_type);
86 channels_[name] = channel;
87
88 if (state_ == STATE_NEGOTIATED) {
89 SetProxyImpl(name, channel);
90 } else if (state_ == STATE_CONNECTING) {
91 GetOrCreateImpl(name, content_type);
92 }
93 return channel;
94 }
95
DestroyChannel(const std::string & name)96 void TransportProxy::DestroyChannel(const std::string& name) {
97 TransportChannel* channel = GetChannel(name);
98 if (channel) {
99 channels_.erase(name);
100 channel->SignalDestroyed(channel);
101 delete channel;
102 }
103 }
104
SpeculativelyConnectChannels()105 void TransportProxy::SpeculativelyConnectChannels() {
106 ASSERT(state_ == STATE_INIT || state_ == STATE_CONNECTING);
107 state_ = STATE_CONNECTING;
108 for (ChannelMap::iterator iter = channels_.begin();
109 iter != channels_.end(); ++iter) {
110 GetOrCreateImpl(iter->first, iter->second->content_type());
111 }
112 transport_->ConnectChannels();
113 }
114
CompleteNegotiation()115 void TransportProxy::CompleteNegotiation() {
116 if (state_ != STATE_NEGOTIATED) {
117 state_ = STATE_NEGOTIATED;
118 for (ChannelMap::iterator iter = channels_.begin();
119 iter != channels_.end(); ++iter) {
120 SetProxyImpl(iter->first, iter->second);
121 }
122 transport_->ConnectChannels();
123 }
124 }
125
AddSentCandidates(const Candidates & candidates)126 void TransportProxy::AddSentCandidates(const Candidates& candidates) {
127 for (Candidates::const_iterator cand = candidates.begin();
128 cand != candidates.end(); ++cand) {
129 sent_candidates_.push_back(*cand);
130 }
131 }
132
133
GetProxy(const std::string & name)134 TransportChannelProxy* TransportProxy::GetProxy(const std::string& name) {
135 ChannelMap::iterator iter = channels_.find(name);
136 return (iter != channels_.end()) ? iter->second : NULL;
137 }
138
GetOrCreateImpl(const std::string & name,const std::string & content_type)139 TransportChannelImpl* TransportProxy::GetOrCreateImpl(
140 const std::string& name, const std::string& content_type) {
141 TransportChannelImpl* impl = transport_->GetChannel(name);
142 if (impl == NULL) {
143 impl = transport_->CreateChannel(name, content_type);
144 }
145 return impl;
146 }
147
SetProxyImpl(const std::string & name,TransportChannelProxy * proxy)148 void TransportProxy::SetProxyImpl(
149 const std::string& name, TransportChannelProxy* proxy) {
150 TransportChannelImpl* impl = GetOrCreateImpl(name, proxy->content_type());
151 ASSERT(impl != NULL);
152 proxy->SetImplementation(impl);
153 }
154
155
156
157
BaseSession(talk_base::Thread * signaling_thread)158 BaseSession::BaseSession(talk_base::Thread *signaling_thread)
159 : state_(STATE_INIT), error_(ERROR_NONE),
160 local_description_(NULL), remote_description_(NULL),
161 signaling_thread_(signaling_thread) {
162 }
163
~BaseSession()164 BaseSession::~BaseSession() {
165 delete remote_description_;
166 delete local_description_;
167 }
168
SetState(State state)169 void BaseSession::SetState(State state) {
170 ASSERT(signaling_thread_->IsCurrent());
171 if (state != state_) {
172 state_ = state;
173 SignalState(this, state_);
174 signaling_thread_->Post(this, MSG_STATE);
175 }
176 }
177
SetError(Error error)178 void BaseSession::SetError(Error error) {
179 ASSERT(signaling_thread_->IsCurrent());
180 if (error != error_) {
181 error_ = error;
182 SignalError(this, error);
183 }
184 }
185
OnMessage(talk_base::Message * pmsg)186 void BaseSession::OnMessage(talk_base::Message *pmsg) {
187 switch (pmsg->message_id) {
188 case MSG_TIMEOUT:
189 // Session timeout has occured.
190 SetError(ERROR_TIME);
191 break;
192
193 case MSG_ERROR:
194 TerminateWithReason(STR_TERMINATE_ERROR);
195 break;
196
197 case MSG_STATE:
198 switch (state_) {
199 case STATE_SENTACCEPT:
200 case STATE_RECEIVEDACCEPT:
201 SetState(STATE_INPROGRESS);
202 break;
203
204 case STATE_SENTREJECT:
205 case STATE_RECEIVEDREJECT:
206 // Assume clean termination.
207 Terminate();
208 break;
209
210 default:
211 // Explicitly ignoring some states here.
212 break;
213 }
214 break;
215 }
216 }
217
218
Session(SessionManager * session_manager,const std::string & local_name,const std::string & initiator_name,const std::string & sid,const std::string & content_type,SessionClient * client)219 Session::Session(SessionManager *session_manager,
220 const std::string& local_name,
221 const std::string& initiator_name,
222 const std::string& sid, const std::string& content_type,
223 SessionClient* client) :
224 BaseSession(session_manager->signaling_thread()) {
225 ASSERT(session_manager->signaling_thread()->IsCurrent());
226 ASSERT(client != NULL);
227 session_manager_ = session_manager;
228 local_name_ = local_name;
229 sid_ = sid;
230 initiator_name_ = initiator_name;
231 content_type_ = content_type;
232 // TODO: Once we support different transport types,
233 // don't hard code this here.
234 transport_type_ = NS_GINGLE_P2P;
235 transport_parser_ = new P2PTransportParser();
236 client_ = client;
237 error_ = ERROR_NONE;
238 state_ = STATE_INIT;
239 initiator_ = false;
240 current_protocol_ = PROTOCOL_HYBRID;
241 }
242
~Session()243 Session::~Session() {
244 ASSERT(signaling_thread_->IsCurrent());
245
246 ASSERT(state_ != STATE_DEINIT);
247 state_ = STATE_DEINIT;
248 SignalState(this, state_);
249
250 for (TransportMap::iterator iter = transports_.begin();
251 iter != transports_.end(); ++iter) {
252 delete iter->second;
253 }
254
255 delete transport_parser_;
256 }
257
GetTransport(const std::string & content_name)258 Transport* Session::GetTransport(const std::string& content_name) {
259 TransportProxy* transproxy = GetTransportProxy(content_name);
260 if (transproxy == NULL)
261 return NULL;
262 return transproxy->impl();
263 }
264
set_allow_local_ips(bool allow)265 void Session::set_allow_local_ips(bool allow) {
266 allow_local_ips_ = allow;
267 for (TransportMap::iterator iter = transports_.begin();
268 iter != transports_.end(); ++iter) {
269 iter->second->impl()->set_allow_local_ips(allow);
270 }
271 }
272
Initiate(const std::string & to,const SessionDescription * sdesc)273 bool Session::Initiate(const std::string &to,
274 const SessionDescription* sdesc) {
275 ASSERT(signaling_thread_->IsCurrent());
276 SessionError error;
277
278 // Only from STATE_INIT
279 if (state_ != STATE_INIT)
280 return false;
281
282 // Setup for signaling.
283 remote_name_ = to;
284 initiator_ = true;
285 set_local_description(sdesc);
286 if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()),
287 &error)) {
288 LOG(LS_ERROR) << "Could not create transports: " << error.text;
289 return false;
290 }
291
292 if (!SendInitiateMessage(sdesc, &error)) {
293 LOG(LS_ERROR) << "Could not send initiate message: " << error.text;
294 return false;
295 }
296
297 SetState(Session::STATE_SENTINITIATE);
298
299 SpeculativelyConnectAllTransportChannels();
300 return true;
301 }
302
Accept(const SessionDescription * sdesc)303 bool Session::Accept(const SessionDescription* sdesc) {
304 ASSERT(signaling_thread_->IsCurrent());
305
306 // Only if just received initiate
307 if (state_ != STATE_RECEIVEDINITIATE)
308 return false;
309
310 // Setup for signaling.
311 initiator_ = false;
312 set_local_description(sdesc);
313
314 SessionError error;
315 if (!SendAcceptMessage(sdesc, &error)) {
316 LOG(LS_ERROR) << "Could not send accept message: " << error.text;
317 return false;
318 }
319
320 SetState(Session::STATE_SENTACCEPT);
321 return true;
322 }
323
Reject(const std::string & reason)324 bool Session::Reject(const std::string& reason) {
325 ASSERT(signaling_thread_->IsCurrent());
326
327 // Reject is sent in response to an initiate or modify, to reject the
328 // request
329 if (state_ != STATE_RECEIVEDINITIATE && state_ != STATE_RECEIVEDMODIFY)
330 return false;
331
332 // Setup for signaling.
333 initiator_ = false;
334
335 SessionError error;
336 if (!SendRejectMessage(reason, &error)) {
337 LOG(LS_ERROR) << "Could not send reject message: " << error.text;
338 return false;
339 }
340
341 SetState(STATE_SENTREJECT);
342 return true;
343 }
344
TerminateWithReason(const std::string & reason)345 bool Session::TerminateWithReason(const std::string& reason) {
346 ASSERT(signaling_thread_->IsCurrent());
347
348 // Either side can terminate, at any time.
349 switch (state_) {
350 case STATE_SENTTERMINATE:
351 case STATE_RECEIVEDTERMINATE:
352 return false;
353
354 case STATE_SENTREJECT:
355 case STATE_RECEIVEDREJECT:
356 // We don't need to send terminate if we sent or received a reject...
357 // it's implicit.
358 break;
359
360 default:
361 SessionError error;
362 if (!SendTerminateMessage(reason, &error)) {
363 LOG(LS_ERROR) << "Could not send terminate message: " << error.text;
364 return false;
365 }
366 break;
367 }
368
369 SetState(STATE_SENTTERMINATE);
370 return true;
371 }
372
SendInfoMessage(const XmlElements & elems)373 bool Session::SendInfoMessage(const XmlElements& elems) {
374 ASSERT(signaling_thread_->IsCurrent());
375 SessionError error;
376 if (!SendMessage(ACTION_SESSION_INFO, elems, &error)) {
377 LOG(LS_ERROR) << "Could not send info message " << error.text;
378 return false;
379 }
380 return true;
381 }
382
383
GetTransportProxy(const Transport * transport)384 TransportProxy* Session::GetTransportProxy(const Transport* transport) {
385 for (TransportMap::iterator iter = transports_.begin();
386 iter != transports_.end(); ++iter) {
387 TransportProxy* transproxy = iter->second;
388 if (transproxy->impl() == transport) {
389 return transproxy;
390 }
391 }
392 return NULL;
393 }
394
GetTransportProxy(const std::string & content_name)395 TransportProxy* Session::GetTransportProxy(const std::string& content_name) {
396 TransportMap::iterator iter = transports_.find(content_name);
397 return (iter != transports_.end()) ? iter->second : NULL;
398 }
399
GetFirstTransportProxy()400 TransportProxy* Session::GetFirstTransportProxy() {
401 if (transports_.empty())
402 return NULL;
403 return transports_.begin()->second;
404 }
405
GetEmptyTransportInfos(const ContentInfos & contents) const406 TransportInfos Session::GetEmptyTransportInfos(
407 const ContentInfos& contents) const {
408 TransportInfos tinfos;
409 for (ContentInfos::const_iterator content = contents.begin();
410 content != contents.end(); ++content) {
411 tinfos.push_back(
412 TransportInfo(content->name, transport_type_, Candidates()));
413 }
414 return tinfos;
415 }
416
417
OnRemoteCandidates(const TransportInfos & tinfos,ParseError * error)418 bool Session::OnRemoteCandidates(
419 const TransportInfos& tinfos, ParseError* error) {
420 for (TransportInfos::const_iterator tinfo = tinfos.begin();
421 tinfo != tinfos.end(); ++tinfo) {
422 TransportProxy* transproxy = GetTransportProxy(tinfo->content_name);
423 if (transproxy == NULL) {
424 return BadParse("Unknown content name: " + tinfo->content_name, error);
425 }
426
427 // Must complete negotiation before sending remote candidates, or
428 // there won't be any channel impls.
429 transproxy->CompleteNegotiation();
430 for (Candidates::const_iterator cand = tinfo->candidates.begin();
431 cand != tinfo->candidates.end(); ++cand) {
432 if (!transproxy->impl()->VerifyCandidate(*cand, error))
433 return false;
434
435 if (!transproxy->impl()->HasChannel(cand->name())) {
436 buzz::XmlElement* extra_info =
437 new buzz::XmlElement(QN_GINGLE_P2P_UNKNOWN_CHANNEL_NAME);
438 extra_info->AddAttr(buzz::QN_NAME, cand->name());
439 error->extra = extra_info;
440
441 return BadParse("channel named in candidate does not exist: " +
442 cand->name() + " for content: "+ tinfo->content_name,
443 error);
444 }
445 }
446 transproxy->impl()->OnRemoteCandidates(tinfo->candidates);
447 }
448
449 return true;
450 }
451
452
GetOrCreateTransportProxy(const std::string & content_name)453 TransportProxy* Session::GetOrCreateTransportProxy(
454 const std::string& content_name) {
455 TransportProxy* transproxy = GetTransportProxy(content_name);
456 if (transproxy)
457 return transproxy;
458
459 Transport* transport =
460 new P2PTransport(signaling_thread_,
461 session_manager_->worker_thread(),
462 session_manager_->port_allocator());
463 transport->set_allow_local_ips(allow_local_ips_);
464 transport->SignalConnecting.connect(
465 this, &Session::OnTransportConnecting);
466 transport->SignalWritableState.connect(
467 this, &Session::OnTransportWritable);
468 transport->SignalRequestSignaling.connect(
469 this, &Session::OnTransportRequestSignaling);
470 transport->SignalCandidatesReady.connect(
471 this, &Session::OnTransportCandidatesReady);
472 transport->SignalTransportError.connect(
473 this, &Session::OnTransportSendError);
474 transport->SignalChannelGone.connect(
475 this, &Session::OnTransportChannelGone);
476
477 transproxy = new TransportProxy(content_name, transport);
478 transports_[content_name] = transproxy;
479
480 return transproxy;
481 }
482
CreateTransportProxies(const TransportInfos & tinfos,SessionError * error)483 bool Session::CreateTransportProxies(const TransportInfos& tinfos,
484 SessionError* error) {
485 for (TransportInfos::const_iterator tinfo = tinfos.begin();
486 tinfo != tinfos.end(); ++tinfo) {
487 if (tinfo->transport_type != transport_type_) {
488 error->SetText("No supported transport in offer.");
489 return false;
490 }
491
492 GetOrCreateTransportProxy(tinfo->content_name);
493 }
494 return true;
495 }
496
SpeculativelyConnectAllTransportChannels()497 void Session::SpeculativelyConnectAllTransportChannels() {
498 for (TransportMap::iterator iter = transports_.begin();
499 iter != transports_.end(); ++iter) {
500 iter->second->SpeculativelyConnectChannels();
501 }
502 }
503
GetTransportParsers()504 TransportParserMap Session::GetTransportParsers() {
505 TransportParserMap parsers;
506 parsers[transport_type_] = transport_parser_;
507 return parsers;
508 }
509
GetContentParsers()510 ContentParserMap Session::GetContentParsers() {
511 ContentParserMap parsers;
512 parsers[content_type_] = client_;
513 return parsers;
514 }
515
CreateChannel(const std::string & content_name,const std::string & channel_name)516 TransportChannel* Session::CreateChannel(const std::string& content_name,
517 const std::string& channel_name) {
518 // We create the proxy "on demand" here because we need to support
519 // creating channels at any time, even before we send or receive
520 // initiate messages, which is before we create the transports.
521 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
522 return transproxy->CreateChannel(channel_name, content_type_);
523 }
524
GetChannel(const std::string & content_name,const std::string & channel_name)525 TransportChannel* Session::GetChannel(const std::string& content_name,
526 const std::string& channel_name) {
527 TransportProxy* transproxy = GetTransportProxy(content_name);
528 if (transproxy == NULL)
529 return NULL;
530 else
531 return transproxy->GetChannel(channel_name);
532 }
533
DestroyChannel(const std::string & content_name,const std::string & channel_name)534 void Session::DestroyChannel(const std::string& content_name,
535 const std::string& channel_name) {
536 TransportProxy* transproxy = GetTransportProxy(content_name);
537 ASSERT(transproxy != NULL);
538 transproxy->DestroyChannel(channel_name);
539 }
540
OnSignalingReady()541 void Session::OnSignalingReady() {
542 ASSERT(signaling_thread_->IsCurrent());
543 for (TransportMap::iterator iter = transports_.begin();
544 iter != transports_.end(); ++iter) {
545 iter->second->impl()->OnSignalingReady();
546 }
547 }
548
OnTransportConnecting(Transport * transport)549 void Session::OnTransportConnecting(Transport* transport) {
550 // This is an indication that we should begin watching the writability
551 // state of the transport.
552 OnTransportWritable(transport);
553 }
554
OnTransportWritable(Transport * transport)555 void Session::OnTransportWritable(Transport* transport) {
556 ASSERT(signaling_thread_->IsCurrent());
557
558 // If the transport is not writable, start a timer to make sure that it
559 // becomes writable within a reasonable amount of time. If it does not, we
560 // terminate since we can't actually send data. If the transport is writable,
561 // cancel the timer. Note that writability transitions may occur repeatedly
562 // during the lifetime of the session.
563 signaling_thread_->Clear(this, MSG_TIMEOUT);
564 if (transport->HasChannels() && !transport->writable()) {
565 signaling_thread_->PostDelayed(
566 session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT);
567 }
568 }
569
OnTransportRequestSignaling(Transport * transport)570 void Session::OnTransportRequestSignaling(Transport* transport) {
571 ASSERT(signaling_thread_->IsCurrent());
572 SignalRequestSignaling(this);
573 }
574
OnTransportCandidatesReady(Transport * transport,const Candidates & candidates)575 void Session::OnTransportCandidatesReady(Transport* transport,
576 const Candidates& candidates) {
577 ASSERT(signaling_thread_->IsCurrent());
578 TransportProxy* transproxy = GetTransportProxy(transport);
579 if (transproxy != NULL) {
580 if (!transproxy->negotiated()) {
581 transproxy->AddSentCandidates(candidates);
582 }
583 SessionError error;
584 if (!SendTransportInfoMessage(
585 TransportInfo(transproxy->content_name(), transproxy->type(),
586 candidates),
587 &error)) {
588 LOG(LS_ERROR) << "Could not send transport info message: "
589 << error.text;
590 return;
591 }
592 }
593 }
594
OnTransportSendError(Transport * transport,const buzz::XmlElement * stanza,const buzz::QName & name,const std::string & type,const std::string & text,const buzz::XmlElement * extra_info)595 void Session::OnTransportSendError(Transport* transport,
596 const buzz::XmlElement* stanza,
597 const buzz::QName& name,
598 const std::string& type,
599 const std::string& text,
600 const buzz::XmlElement* extra_info) {
601 ASSERT(signaling_thread_->IsCurrent());
602 SignalErrorMessage(this, stanza, name, type, text, extra_info);
603 }
604
OnTransportChannelGone(Transport * transport,const std::string & name)605 void Session::OnTransportChannelGone(Transport* transport,
606 const std::string& name) {
607 ASSERT(signaling_thread_->IsCurrent());
608 SignalChannelGone(this, name);
609 }
610
OnIncomingMessage(const SessionMessage & msg)611 void Session::OnIncomingMessage(const SessionMessage& msg) {
612 ASSERT(signaling_thread_->IsCurrent());
613 ASSERT(state_ == STATE_INIT || msg.from == remote_name_);
614
615 if (current_protocol_== PROTOCOL_HYBRID) {
616 if (msg.protocol == PROTOCOL_GINGLE) {
617 current_protocol_ = PROTOCOL_GINGLE;
618 } else {
619 current_protocol_ = PROTOCOL_JINGLE;
620 }
621 }
622
623 bool valid = false;
624 MessageError error;
625 switch (msg.type) {
626 case ACTION_SESSION_INITIATE:
627 valid = OnInitiateMessage(msg, &error);
628 break;
629 case ACTION_SESSION_INFO:
630 valid = OnInfoMessage(msg);
631 break;
632 case ACTION_SESSION_ACCEPT:
633 valid = OnAcceptMessage(msg, &error);
634 break;
635 case ACTION_SESSION_REJECT:
636 valid = OnRejectMessage(msg, &error);
637 break;
638 case ACTION_SESSION_TERMINATE:
639 valid = OnTerminateMessage(msg, &error);
640 break;
641 case ACTION_TRANSPORT_INFO:
642 valid = OnTransportInfoMessage(msg, &error);
643 break;
644 case ACTION_TRANSPORT_ACCEPT:
645 valid = OnTransportAcceptMessage(msg, &error);
646 break;
647 case ACTION_NOTIFY:
648 valid = OnNotifyMessage(msg, &error);
649 break;
650 case ACTION_UPDATE:
651 valid = OnUpdateMessage(msg, &error);
652 break;
653 default:
654 valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST,
655 "unknown session message type",
656 &error);
657 }
658
659 if (valid) {
660 SendAcknowledgementMessage(msg.stanza);
661 } else {
662 SignalErrorMessage(this, msg.stanza, error.type,
663 "modify", error.text, NULL);
664 }
665 }
666
OnFailedSend(const buzz::XmlElement * orig_stanza,const buzz::XmlElement * error_stanza)667 void Session::OnFailedSend(const buzz::XmlElement* orig_stanza,
668 const buzz::XmlElement* error_stanza) {
669 ASSERT(signaling_thread_->IsCurrent());
670
671 SessionMessage msg;
672 ParseError parse_error;
673 if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) {
674 LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text
675 << ":" << orig_stanza;
676 return;
677 }
678
679 // If the error is a session redirect, call OnRedirectError, which will
680 // continue the session with a new remote JID.
681 SessionRedirect redirect;
682 if (FindSessionRedirect(error_stanza, &redirect)) {
683 SessionError error;
684 if (!OnRedirectError(redirect, &error)) {
685 // TODO: Should we send a message back? The standard
686 // says nothing about it.
687 LOG(LS_ERROR) << "Failed to redirect: " << error.text;
688 SetError(ERROR_RESPONSE);
689 }
690 return;
691 }
692
693 std::string error_type = "cancel";
694
695 const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR);
696 if (error) {
697 ASSERT(error->HasAttr(buzz::QN_TYPE));
698 error_type = error->Attr(buzz::QN_TYPE);
699
700 LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n"
701 << "in response to:\n" << orig_stanza->Str();
702 } else {
703 // don't crash if <error> is missing
704 LOG(LS_ERROR) << "Session error without <error/> element, ignoring";
705 return;
706 }
707
708 if (msg.type == ACTION_TRANSPORT_INFO) {
709 // Transport messages frequently generate errors because they are sent right
710 // when we detect a network failure. For that reason, we ignore such
711 // errors, because if we do not establish writability again, we will
712 // terminate anyway. The exceptions are transport-specific error tags,
713 // which we pass on to the respective transport.
714
715 // TODO: This is only used for unknown channel name.
716 // For Jingle, find a stanard-compliant way of doing this. For
717 // Gingle, guess the content name based on the channel name.
718 for (const buzz::XmlElement* elem = error->FirstElement();
719 NULL != elem; elem = elem->NextElement()) {
720 TransportProxy* transproxy = GetFirstTransportProxy();
721 if (transproxy && transproxy->type() == error->Name().Namespace()) {
722 transproxy->impl()->OnTransportError(elem);
723 }
724 }
725 } else if ((error_type != "continue") && (error_type != "wait")) {
726 // We do not set an error if the other side said it is okay to continue
727 // (possibly after waiting). These errors can be ignored.
728 SetError(ERROR_RESPONSE);
729 }
730 }
731
OnInitiateMessage(const SessionMessage & msg,MessageError * error)732 bool Session::OnInitiateMessage(const SessionMessage& msg,
733 MessageError* error) {
734 if (!CheckState(STATE_INIT, error))
735 return false;
736
737 SessionInitiate init;
738 if (!ParseSessionInitiate(msg.protocol, msg.action_elem,
739 GetContentParsers(), GetTransportParsers(),
740 &init, error))
741 return false;
742
743 SessionError session_error;
744 if (!CreateTransportProxies(init.transports, &session_error)) {
745 return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE,
746 session_error.text, error);
747 }
748
749 initiator_ = false;
750 remote_name_ = msg.from;
751 set_remote_description(new SessionDescription(init.ClearContents()));
752 SetState(STATE_RECEIVEDINITIATE);
753
754 // Users of Session may listen to state change and call Reject().
755 if (state_ != STATE_SENTREJECT) {
756 if (!OnRemoteCandidates(init.transports, error))
757 return false;
758 }
759 return true;
760 }
761
OnAcceptMessage(const SessionMessage & msg,MessageError * error)762 bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) {
763 if (!CheckState(STATE_SENTINITIATE, error))
764 return false;
765
766 SessionAccept accept;
767 if (!ParseSessionAccept(msg.protocol, msg.action_elem,
768 GetContentParsers(), GetTransportParsers(),
769 &accept, error))
770 return false;
771
772 set_remote_description(new SessionDescription(accept.ClearContents()));
773 SetState(STATE_RECEIVEDACCEPT);
774
775 // Users of Session may listen to state change and call Reject().
776 if (state_ != STATE_SENTREJECT) {
777 if (!OnRemoteCandidates(accept.transports, error))
778 return false;
779 }
780
781 return true;
782 }
783
OnRejectMessage(const SessionMessage & msg,MessageError * error)784 bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) {
785 if (!CheckState(STATE_SENTINITIATE, error))
786 return false;
787
788 SetState(STATE_RECEIVEDREJECT);
789 return true;
790 }
791
792 // Only used by app/win32/fileshare.cc.
OnInfoMessage(const SessionMessage & msg)793 bool Session::OnInfoMessage(const SessionMessage& msg) {
794 SignalInfoMessage(this, CopyOfXmlChildren(msg.action_elem));
795 return true;
796 }
797
OnTerminateMessage(const SessionMessage & msg,MessageError * error)798 bool Session::OnTerminateMessage(const SessionMessage& msg,
799 MessageError* error) {
800 SessionTerminate term;
801 if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error))
802 return false;
803
804 SignalReceivedTerminateReason(this, term.reason);
805 if (term.debug_reason != buzz::STR_EMPTY) {
806 LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason;
807 }
808
809 SetState(STATE_RECEIVEDTERMINATE);
810 return true;
811 }
812
OnTransportInfoMessage(const SessionMessage & msg,MessageError * error)813 bool Session::OnTransportInfoMessage(const SessionMessage& msg,
814 MessageError* error) {
815 TransportInfos tinfos;
816 if (!ParseTransportInfos(msg.protocol, msg.action_elem,
817 initiator_description()->contents(),
818 GetTransportParsers(), &tinfos, error))
819 return false;
820
821 if (!OnRemoteCandidates(tinfos, error))
822 return false;
823
824 return true;
825 }
826
OnTransportAcceptMessage(const SessionMessage & msg,MessageError * error)827 bool Session::OnTransportAcceptMessage(const SessionMessage& msg,
828 MessageError* error) {
829 // TODO: Currently here only for compatibility with
830 // Gingle 1.1 clients (notably, Google Voice).
831 return true;
832 }
833
OnNotifyMessage(const SessionMessage & msg,MessageError * error)834 bool Session::OnNotifyMessage(const SessionMessage& msg,
835 MessageError* error) {
836 SessionNotify notify;
837 if (!ParseSessionNotify(msg.action_elem, ¬ify, error)) {
838 return false;
839 }
840
841 SignalMediaSources(notify.nickname_to_sources);
842
843 return true;
844 }
845
OnUpdateMessage(const SessionMessage & msg,MessageError * error)846 bool Session::OnUpdateMessage(const SessionMessage& msg,
847 MessageError* error) {
848 SessionUpdate update;
849 if (!ParseSessionUpdate(msg.action_elem, &update, error)) {
850 return false;
851 }
852
853 // TODO: Process this message appropriately.
854
855 return true;
856 }
857
BareJidsEqual(const std::string & name1,const std::string & name2)858 bool BareJidsEqual(const std::string& name1,
859 const std::string& name2) {
860 buzz::Jid jid1(name1);
861 buzz::Jid jid2(name2);
862
863 return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2);
864 }
865
OnRedirectError(const SessionRedirect & redirect,SessionError * error)866 bool Session::OnRedirectError(const SessionRedirect& redirect,
867 SessionError* error) {
868 MessageError message_error;
869 if (!CheckState(STATE_SENTINITIATE, &message_error)) {
870 return BadWrite(message_error.text, error);
871 }
872
873 if (!BareJidsEqual(remote_name_, redirect.target))
874 return BadWrite("Redirection not allowed: must be the same bare jid.",
875 error);
876
877 // When we receive a redirect, we point the session at the new JID
878 // and resend the candidates.
879 remote_name_ = redirect.target;
880 return (SendInitiateMessage(local_description(), error) &&
881 ResendAllTransportInfoMessages(error));
882 }
883
CheckState(State state,MessageError * error)884 bool Session::CheckState(State state, MessageError* error) {
885 ASSERT(state_ == state);
886 if (state_ != state) {
887 return BadMessage(buzz::QN_STANZA_NOT_ALLOWED,
888 "message not allowed in current state",
889 error);
890 }
891 return true;
892 }
893
SetError(Error error)894 void Session::SetError(Error error) {
895 BaseSession::SetError(error);
896 if (error_ != ERROR_NONE)
897 signaling_thread_->Post(this, MSG_ERROR);
898 }
899
OnMessage(talk_base::Message * pmsg)900 void Session::OnMessage(talk_base::Message *pmsg) {
901 // preserve this because BaseSession::OnMessage may modify it
902 BaseSession::State orig_state = state_;
903
904 BaseSession::OnMessage(pmsg);
905
906 switch (pmsg->message_id) {
907 case MSG_STATE:
908 switch (orig_state) {
909 case STATE_SENTTERMINATE:
910 case STATE_RECEIVEDTERMINATE:
911 session_manager_->DestroySession(this);
912 break;
913
914 default:
915 // Explicitly ignoring some states here.
916 break;
917 }
918 break;
919 }
920 }
921
SendInitiateMessage(const SessionDescription * sdesc,SessionError * error)922 bool Session::SendInitiateMessage(const SessionDescription* sdesc,
923 SessionError* error) {
924 SessionInitiate init;
925 init.contents = sdesc->contents();
926 init.transports = GetEmptyTransportInfos(init.contents);
927 return SendMessage(ACTION_SESSION_INITIATE, init, error);
928 }
929
WriteSessionAction(SignalingProtocol protocol,const SessionInitiate & init,XmlElements * elems,WriteError * error)930 bool Session::WriteSessionAction(
931 SignalingProtocol protocol, const SessionInitiate& init,
932 XmlElements* elems, WriteError* error) {
933 ContentParserMap content_parsers = GetContentParsers();
934 TransportParserMap trans_parsers = GetTransportParsers();
935
936 return WriteSessionInitiate(protocol, init.contents, init.transports,
937 content_parsers, trans_parsers,
938 elems, error);
939 }
940
SetVideoView(const std::vector<VideoViewRequest> & view_requests)941 bool Session::SetVideoView(
942 const std::vector<VideoViewRequest>& view_requests) {
943 SessionView view;
944 SessionError error;
945
946 view.view_requests = view_requests;
947
948 return !SendViewMessage(view, &error);
949 }
950
SendAcceptMessage(const SessionDescription * sdesc,SessionError * error)951 bool Session::SendAcceptMessage(const SessionDescription* sdesc,
952 SessionError* error) {
953 XmlElements elems;
954 if (!WriteSessionAccept(current_protocol_,
955 sdesc->contents(),
956 GetEmptyTransportInfos(sdesc->contents()),
957 GetContentParsers(), GetTransportParsers(),
958 &elems, error)) {
959 return false;
960 }
961 return SendMessage(ACTION_SESSION_ACCEPT, elems, error);
962 }
963
SendRejectMessage(const std::string & reason,SessionError * error)964 bool Session::SendRejectMessage(const std::string& reason,
965 SessionError* error) {
966 XmlElements elems;
967 return SendMessage(ACTION_SESSION_REJECT, elems, error);
968 }
969
970
SendTerminateMessage(const std::string & reason,SessionError * error)971 bool Session::SendTerminateMessage(const std::string& reason,
972 SessionError* error) {
973 SessionTerminate term(reason);
974 return SendMessage(ACTION_SESSION_TERMINATE, term, error);
975 }
976
WriteSessionAction(SignalingProtocol protocol,const SessionTerminate & term,XmlElements * elems,WriteError * error)977 bool Session::WriteSessionAction(SignalingProtocol protocol,
978 const SessionTerminate& term,
979 XmlElements* elems, WriteError* error) {
980 WriteSessionTerminate(protocol, term, elems);
981 return true;
982 }
983
SendTransportInfoMessage(const TransportInfo & tinfo,SessionError * error)984 bool Session::SendTransportInfoMessage(const TransportInfo& tinfo,
985 SessionError* error) {
986 return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error);
987 }
988
WriteSessionAction(SignalingProtocol protocol,const TransportInfo & tinfo,XmlElements * elems,WriteError * error)989 bool Session::WriteSessionAction(SignalingProtocol protocol,
990 const TransportInfo& tinfo,
991 XmlElements* elems, WriteError* error) {
992 TransportInfos tinfos;
993 tinfos.push_back(tinfo);
994 TransportParserMap parsers = GetTransportParsers();
995
996 return WriteTransportInfos(protocol, tinfos, parsers,
997 elems, error);
998 }
999
SendViewMessage(const SessionView & view,SessionError * error)1000 bool Session::SendViewMessage(const SessionView& view, SessionError* error) {
1001 XmlElements elems;
1002 WriteSessionView(view, &elems);
1003 return SendMessage(ACTION_VIEW, elems, error);
1004 }
1005
ResendAllTransportInfoMessages(SessionError * error)1006 bool Session::ResendAllTransportInfoMessages(SessionError* error) {
1007 for (TransportMap::iterator iter = transports_.begin();
1008 iter != transports_.end(); ++iter) {
1009 TransportProxy* transproxy = iter->second;
1010 if (transproxy->sent_candidates().size() > 0) {
1011 if (!SendTransportInfoMessage(
1012 TransportInfo(
1013 transproxy->content_name(),
1014 transproxy->type(),
1015 transproxy->sent_candidates()),
1016 error)) {
1017 return false;
1018 }
1019 transproxy->ClearSentCandidates();
1020 }
1021 }
1022 return true;
1023 }
1024
SendMessage(ActionType type,const XmlElements & action_elems,SessionError * error)1025 bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1026 SessionError* error) {
1027 talk_base::scoped_ptr<buzz::XmlElement> stanza(
1028 new buzz::XmlElement(buzz::QN_IQ));
1029
1030 SessionMessage msg(current_protocol_, type, sid_, initiator_name_);
1031 msg.to = remote_name_;
1032 WriteSessionMessage(msg, action_elems, stanza.get());
1033
1034 SignalOutgoingMessage(this, stanza.get());
1035 return true;
1036 }
1037
1038 template <typename Action>
SendMessage(ActionType type,const Action & action,SessionError * error)1039 bool Session::SendMessage(ActionType type, const Action& action,
1040 SessionError* error) {
1041 talk_base::scoped_ptr<buzz::XmlElement> stanza(
1042 new buzz::XmlElement(buzz::QN_IQ));
1043 if (!WriteActionMessage(type, action, stanza.get(), error))
1044 return false;
1045
1046 SignalOutgoingMessage(this, stanza.get());
1047 return true;
1048 }
1049
1050 template <typename Action>
WriteActionMessage(ActionType type,const Action & action,buzz::XmlElement * stanza,WriteError * error)1051 bool Session::WriteActionMessage(ActionType type, const Action& action,
1052 buzz::XmlElement* stanza,
1053 WriteError* error) {
1054 if (current_protocol_ == PROTOCOL_HYBRID) {
1055 if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error))
1056 return false;
1057 if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error))
1058 return false;
1059 } else {
1060 if (!WriteActionMessage(current_protocol_, type, action, stanza, error))
1061 return false;
1062 }
1063 return true;
1064 }
1065
1066 template <typename Action>
WriteActionMessage(SignalingProtocol protocol,ActionType type,const Action & action,buzz::XmlElement * stanza,WriteError * error)1067 bool Session::WriteActionMessage(SignalingProtocol protocol,
1068 ActionType type, const Action& action,
1069 buzz::XmlElement* stanza, WriteError* error) {
1070 XmlElements action_elems;
1071 if (!WriteSessionAction(protocol, action, &action_elems, error))
1072 return false;
1073
1074 SessionMessage msg(protocol, type, sid_, initiator_name_);
1075 msg.to = remote_name_;
1076
1077 WriteSessionMessage(msg, action_elems, stanza);
1078 return true;
1079 }
1080
SendAcknowledgementMessage(const buzz::XmlElement * stanza)1081 void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) {
1082 talk_base::scoped_ptr<buzz::XmlElement> ack(
1083 new buzz::XmlElement(buzz::QN_IQ));
1084 ack->SetAttr(buzz::QN_TO, remote_name_);
1085 ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID));
1086 ack->SetAttr(buzz::QN_TYPE, "result");
1087
1088 SignalOutgoingMessage(this, ack.get());
1089 }
1090
1091 } // namespace cricket
1092