• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "talk/p2p/base/p2ptransportchannel.h"
29 
30 #include <set>
31 #include "talk/base/common.h"
32 #include "talk/base/logging.h"
33 #include "talk/p2p/base/common.h"
34 
35 namespace {
36 
37 // messages for queuing up work for ourselves
38 const uint32 MSG_SORT = 1;
39 const uint32 MSG_PING = 2;
40 const uint32 MSG_ALLOCATE = 3;
41 
42 // When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers)
43 // for pinging.  When the socket is writable, we will use only 1 Kbps because
44 // we don't want to degrade the quality on a modem.  These numbers should work
45 // well on a 28.8K modem, which is the slowest connection on which the voice
46 // quality is reasonable at all.
47 static const uint32 PING_PACKET_SIZE = 60 * 8;
48 static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000;  // 480ms
49 static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000;  // 50ms
50 
51 // If there is a current writable connection, then we will also try hard to
52 // make sure it is pinged at this rate.
53 static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900;  // 2*WRITABLE_DELAY - bit
54 
55 // The minimum improvement in RTT that justifies a switch.
56 static const double kMinImprovement = 10;
57 
58 // Amount of time that we wait when *losing* writability before we try doing
59 // another allocation.
60 static const int kAllocateDelay = 1 * 1000;  // 1 second
61 
62 // We will try creating a new allocator from scratch after a delay of this
63 // length without becoming writable (or timing out).
64 static const int kAllocatePeriod = 20 * 1000;  // 20 seconds
65 
GetOrigin(cricket::Port * port,cricket::Port * origin_port)66 cricket::Port::CandidateOrigin GetOrigin(cricket::Port* port,
67                                          cricket::Port* origin_port) {
68   if (!origin_port)
69     return cricket::Port::ORIGIN_MESSAGE;
70   else if (port == origin_port)
71     return cricket::Port::ORIGIN_THIS_PORT;
72   else
73     return cricket::Port::ORIGIN_OTHER_PORT;
74 }
75 
76 // Compares two connections based only on static information about them.
CompareConnectionCandidates(cricket::Connection * a,cricket::Connection * b)77 int CompareConnectionCandidates(cricket::Connection* a,
78                                 cricket::Connection* b) {
79   // Combine local and remote preferences
80   ASSERT(a->local_candidate().preference() == a->port()->preference());
81   ASSERT(b->local_candidate().preference() == b->port()->preference());
82   double a_pref = a->local_candidate().preference()
83                 * a->remote_candidate().preference();
84   double b_pref = b->local_candidate().preference()
85                 * b->remote_candidate().preference();
86 
87   // Now check combined preferences. Lower values get sorted last.
88   if (a_pref > b_pref)
89     return 1;
90   if (a_pref < b_pref)
91     return -1;
92 
93   return 0;
94 }
95 
96 // Compare two connections based on their writability and static preferences.
CompareConnections(cricket::Connection * a,cricket::Connection * b)97 int CompareConnections(cricket::Connection *a, cricket::Connection *b) {
98   // Sort based on write-state.  Better states have lower values.
99   if (a->write_state() < b->write_state())
100     return 1;
101   if (a->write_state() > b->write_state())
102     return -1;
103 
104   // Compare the candidate information.
105   return CompareConnectionCandidates(a, b);
106 }
107 
108 // Wraps the comparison connection into a less than operator that puts higher
109 // priority writable connections first.
110 class ConnectionCompare {
111  public:
operator ()(const cricket::Connection * ca,const cricket::Connection * cb)112   bool operator()(const cricket::Connection *ca,
113                   const cricket::Connection *cb) {
114     cricket::Connection* a = const_cast<cricket::Connection*>(ca);
115     cricket::Connection* b = const_cast<cricket::Connection*>(cb);
116 
117     // Compare first on writability and static preferences.
118     int cmp = CompareConnections(a, b);
119     if (cmp > 0)
120       return true;
121     if (cmp < 0)
122       return false;
123 
124     // Otherwise, sort based on latency estimate.
125     return a->rtt() < b->rtt();
126 
127     // Should we bother checking for the last connection that last received
128     // data? It would help rendezvous on the connection that is also receiving
129     // packets.
130     //
131     // TODO: Yes we should definitely do this.  The TCP protocol gains
132     // efficiency by being used bidirectionally, as opposed to two separate
133     // unidirectional streams.  This test should probably occur before
134     // comparison of local prefs (assuming combined prefs are the same).  We
135     // need to be careful though, not to bounce back and forth with both sides
136     // trying to rendevous with the other.
137   }
138 };
139 
140 // Determines whether we should switch between two connections, based first on
141 // static preferences and then (if those are equal) on latency estimates.
ShouldSwitch(cricket::Connection * a_conn,cricket::Connection * b_conn)142 bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) {
143   if (a_conn == b_conn)
144     return false;
145 
146   if (!a_conn || !b_conn)  // don't think the latter should happen
147     return true;
148 
149   int prefs_cmp = CompareConnections(a_conn, b_conn);
150   if (prefs_cmp < 0)
151     return true;
152   if (prefs_cmp > 0)
153     return false;
154 
155   return b_conn->rtt() <= a_conn->rtt() + kMinImprovement;
156 }
157 
158 }  // unnamed namespace
159 
160 namespace cricket {
161 
P2PTransportChannel(const std::string & name,const std::string & content_type,P2PTransport * transport,PortAllocator * allocator)162 P2PTransportChannel::P2PTransportChannel(const std::string &name,
163                                          const std::string &content_type,
164                                          P2PTransport* transport,
165                                          PortAllocator *allocator) :
166     TransportChannelImpl(name, content_type),
167     transport_(transport),
168     allocator_(allocator),
169     worker_thread_(talk_base::Thread::Current()),
170     waiting_for_signaling_(false),
171     error_(0),
172     best_connection_(NULL),
173     pinging_started_(false),
174     sort_dirty_(false),
175     was_writable_(false),
176     was_timed_out_(true) {
177 }
178 
~P2PTransportChannel()179 P2PTransportChannel::~P2PTransportChannel() {
180   ASSERT(worker_thread_ == talk_base::Thread::Current());
181 
182   for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
183     delete allocator_sessions_[i];
184 }
185 
186 // Add the allocator session to our list so that we know which sessions
187 // are still active.
AddAllocatorSession(PortAllocatorSession * session)188 void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) {
189   session->set_generation(static_cast<uint32>(allocator_sessions_.size()));
190   allocator_sessions_.push_back(session);
191 
192   // We now only want to apply new candidates that we receive to the ports
193   // created by this new session because these are replacing those of the
194   // previous sessions.
195   ports_.clear();
196 
197   session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady);
198   session->SignalCandidatesReady.connect(
199       this, &P2PTransportChannel::OnCandidatesReady);
200   session->GetInitialPorts();
201   if (pinging_started_)
202     session->StartGetAllPorts();
203 }
204 
205 // Go into the state of processing candidates, and running in general
Connect()206 void P2PTransportChannel::Connect() {
207   ASSERT(worker_thread_ == talk_base::Thread::Current());
208 
209   // Kick off an allocator session
210   Allocate();
211 
212   // Start pinging as the ports come in.
213   thread()->Post(this, MSG_PING);
214 }
215 
216 // Reset the socket, clear up any previous allocations and start over
Reset()217 void P2PTransportChannel::Reset() {
218   ASSERT(worker_thread_ == talk_base::Thread::Current());
219 
220   // Get rid of all the old allocators.  This should clean up everything.
221   for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
222     delete allocator_sessions_[i];
223 
224   allocator_sessions_.clear();
225   ports_.clear();
226   connections_.clear();
227   best_connection_ = NULL;
228 
229   // Forget about all of the candidates we got before.
230   remote_candidates_.clear();
231 
232   // Revert to the initial state.
233   set_readable(false);
234   set_writable(false);
235 
236   // Reinitialize the rest of our state.
237   waiting_for_signaling_ = false;
238   pinging_started_ = false;
239   sort_dirty_ = false;
240   was_writable_ = false;
241   was_timed_out_ = true;
242 
243   // If we allocated before, start a new one now.
244   if (transport_->connect_requested())
245     Allocate();
246 
247   // Start pinging as the ports come in.
248   thread()->Clear(this);
249   thread()->Post(this, MSG_PING);
250 }
251 
252 // A new port is available, attempt to make connections for it
OnPortReady(PortAllocatorSession * session,Port * port)253 void P2PTransportChannel::OnPortReady(PortAllocatorSession *session,
254                                       Port* port) {
255   ASSERT(worker_thread_ == talk_base::Thread::Current());
256 
257   // Set in-effect options on the new port
258   for (OptionMap::const_iterator it = options_.begin();
259        it != options_.end();
260        ++it) {
261     int val = port->SetOption(it->first, it->second);
262     if (val < 0) {
263       LOG_J(LS_WARNING, port) << "SetOption(" << it->first
264                               << ", " << it->second
265                               << ") failed: " << port->GetError();
266     }
267   }
268 
269   // Remember the ports and candidates, and signal that candidates are ready.
270   // The session will handle this, and send an initiate/accept/modify message
271   // if one is pending.
272 
273   ports_.push_back(port);
274   port->SignalUnknownAddress.connect(
275       this, &P2PTransportChannel::OnUnknownAddress);
276   port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed);
277 
278   // Attempt to create a connection from this new port to all of the remote
279   // candidates that we were given so far.
280 
281   std::vector<RemoteCandidate>::iterator iter;
282   for (iter = remote_candidates_.begin(); iter != remote_candidates_.end();
283        ++iter)
284     CreateConnection(port, *iter, iter->origin_port(), false);
285 
286   SortConnections();
287 }
288 
289 // A new candidate is available, let listeners know
OnCandidatesReady(PortAllocatorSession * session,const std::vector<Candidate> & candidates)290 void P2PTransportChannel::OnCandidatesReady(
291     PortAllocatorSession *session, const std::vector<Candidate>& candidates) {
292   for (size_t i = 0; i < candidates.size(); ++i) {
293     SignalCandidateReady(this, candidates[i]);
294   }
295 }
296 
297 // Handle stun packets
OnUnknownAddress(Port * port,const talk_base::SocketAddress & address,StunMessage * stun_msg,const std::string & remote_username)298 void P2PTransportChannel::OnUnknownAddress(
299     Port *port, const talk_base::SocketAddress &address, StunMessage *stun_msg,
300     const std::string &remote_username) {
301   ASSERT(worker_thread_ == talk_base::Thread::Current());
302 
303   // Port has received a valid stun packet from an address that no Connection
304   // is currently available for. See if the remote user name is in the remote
305   // candidate list. If it isn't return error to the stun request.
306 
307   const Candidate *candidate = NULL;
308   std::vector<RemoteCandidate>::iterator it;
309   for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) {
310     if ((*it).username() == remote_username) {
311       candidate = &(*it);
312       break;
313     }
314   }
315   if (candidate == NULL) {
316     // Don't know about this username, the request is bogus
317     // This sometimes happens if a binding response comes in before the ACCEPT
318     // message.  It is totally valid; the retry state machine will try again.
319 
320     port->SendBindingErrorResponse(stun_msg, address,
321         STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS);
322     delete stun_msg;
323     return;
324   }
325 
326   // Check for connectivity to this address. Create connections
327   // to this address across all local ports. First, add this as a new remote
328   // address
329 
330   Candidate new_remote_candidate = *candidate;
331   new_remote_candidate.set_address(address);
332   // new_remote_candidate.set_protocol(port->protocol());
333 
334   // This remote username exists. Now create connections using this candidate,
335   // and resort
336 
337   if (CreateConnections(new_remote_candidate, port, true)) {
338     // Send the pinger a successful stun response.
339     port->SendBindingResponse(stun_msg, address);
340 
341     // Update the list of connections since we just added another.  We do this
342     // after sending the response since it could (in principle) delete the
343     // connection in question.
344     SortConnections();
345   } else {
346     // Hopefully this won't occur, because changing a destination address
347     // shouldn't cause a new connection to fail
348     ASSERT(false);
349     port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR,
350         STUN_ERROR_REASON_SERVER_ERROR);
351   }
352 
353   delete stun_msg;
354 }
355 
OnCandidate(const Candidate & candidate)356 void P2PTransportChannel::OnCandidate(const Candidate& candidate) {
357   ASSERT(worker_thread_ == talk_base::Thread::Current());
358 
359   // Create connections to this remote candidate.
360   CreateConnections(candidate, NULL, false);
361 
362   // Resort the connections list, which may have new elements.
363   SortConnections();
364 }
365 
366 // Creates connections from all of the ports that we care about to the given
367 // remote candidate.  The return value is true if we created a connection from
368 // the origin port.
CreateConnections(const Candidate & remote_candidate,Port * origin_port,bool readable)369 bool P2PTransportChannel::CreateConnections(const Candidate &remote_candidate,
370                                             Port* origin_port,
371                                             bool readable) {
372   ASSERT(worker_thread_ == talk_base::Thread::Current());
373 
374   // Add a new connection for this candidate to every port that allows such a
375   // connection (i.e., if they have compatible protocols) and that does not
376   // already have a connection to an equivalent candidate.  We must be careful
377   // to make sure that the origin port is included, even if it was pruned,
378   // since that may be the only port that can create this connection.
379 
380   bool created = false;
381 
382   std::vector<Port *>::reverse_iterator it;
383   for (it = ports_.rbegin(); it != ports_.rend(); ++it) {
384     if (CreateConnection(*it, remote_candidate, origin_port, readable)) {
385       if (*it == origin_port)
386         created = true;
387     }
388   }
389 
390   if ((origin_port != NULL) &&
391       std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) {
392     if (CreateConnection(origin_port, remote_candidate, origin_port, readable))
393       created = true;
394   }
395 
396   // Remember this remote candidate so that we can add it to future ports.
397   RememberRemoteCandidate(remote_candidate, origin_port);
398 
399   return created;
400 }
401 
402 // Setup a connection object for the local and remote candidate combination.
403 // And then listen to connection object for changes.
CreateConnection(Port * port,const Candidate & remote_candidate,Port * origin_port,bool readable)404 bool P2PTransportChannel::CreateConnection(Port* port,
405                                            const Candidate& remote_candidate,
406                                            Port* origin_port,
407                                            bool readable) {
408   // Look for an existing connection with this remote address.  If one is not
409   // found, then we can create a new connection for this address.
410   Connection* connection = port->GetConnection(remote_candidate.address());
411   if (connection != NULL) {
412     // It is not legal to try to change any of the parameters of an existing
413     // connection; however, the other side can send a duplicate candidate.
414     if (!remote_candidate.IsEquivalent(connection->remote_candidate())) {
415       LOG(INFO) << "Attempt to change a remote candidate";
416       return false;
417     }
418   } else {
419     Port::CandidateOrigin origin = GetOrigin(port, origin_port);
420     connection = port->CreateConnection(remote_candidate, origin);
421     if (!connection)
422       return false;
423 
424     connections_.push_back(connection);
425     connection->SignalReadPacket.connect(
426         this, &P2PTransportChannel::OnReadPacket);
427     connection->SignalStateChange.connect(
428         this, &P2PTransportChannel::OnConnectionStateChange);
429     connection->SignalDestroyed.connect(
430         this, &P2PTransportChannel::OnConnectionDestroyed);
431 
432     LOG_J(LS_INFO, this) << "Created connection with origin=" << origin << ", ("
433                          << connections_.size() << " total)";
434   }
435 
436   // If we are readable, it is because we are creating this in response to a
437   // ping from the other side.  This will cause the state to become readable.
438   if (readable)
439     connection->ReceivedPing();
440 
441   return true;
442 }
443 
444 // Maintain our remote candidate list, adding this new remote one.
RememberRemoteCandidate(const Candidate & remote_candidate,Port * origin_port)445 void P2PTransportChannel::RememberRemoteCandidate(
446     const Candidate& remote_candidate, Port* origin_port) {
447   // Remove any candidates whose generation is older than this one.  The
448   // presence of a new generation indicates that the old ones are not useful.
449   uint32 i = 0;
450   while (i < remote_candidates_.size()) {
451     if (remote_candidates_[i].generation() < remote_candidate.generation()) {
452       LOG(INFO) << "Pruning candidate from old generation: "
453                 << remote_candidates_[i].address().ToString();
454       remote_candidates_.erase(remote_candidates_.begin() + i);
455     } else {
456       i += 1;
457     }
458   }
459 
460   // Make sure this candidate is not a duplicate.
461   for (uint32 i = 0; i < remote_candidates_.size(); ++i) {
462     if (remote_candidates_[i].IsEquivalent(remote_candidate)) {
463       LOG(INFO) << "Duplicate candidate: "
464                 << remote_candidate.address().ToString();
465       return;
466     }
467   }
468 
469   // Try this candidate for all future ports.
470   remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port));
471 
472   // We have some candidates from the other side, we are now serious about
473   // this connection.  Let's do the StartGetAllPorts thing.
474   if (!pinging_started_) {
475     pinging_started_ = true;
476     for (size_t i = 0; i < allocator_sessions_.size(); ++i) {
477       if (!allocator_sessions_[i]->IsGettingAllPorts())
478         allocator_sessions_[i]->StartGetAllPorts();
479     }
480   }
481 }
482 
483 // Send data to the other side, using our best connection
SendPacket(const char * data,size_t len)484 int P2PTransportChannel::SendPacket(const char *data, size_t len) {
485   // This can get called on any thread that is convenient to write from!
486   if (best_connection_ == NULL) {
487     error_ = EWOULDBLOCK;
488     return SOCKET_ERROR;
489   }
490   int sent = best_connection_->Send(data, len);
491   if (sent <= 0) {
492     ASSERT(sent < 0);
493     error_ = best_connection_->GetError();
494   }
495   return sent;
496 }
497 
498 // Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending)
Allocate()499 void P2PTransportChannel::Allocate() {
500   CancelPendingAllocate();
501   // Time for a new allocator, lets make sure we have a signalling channel
502   // to communicate candidates through first.
503   waiting_for_signaling_ = true;
504   SignalRequestSignaling();
505 }
506 
507 // Cancels the pending allocate, if any.
CancelPendingAllocate()508 void P2PTransportChannel::CancelPendingAllocate() {
509   thread()->Clear(this, MSG_ALLOCATE);
510 }
511 
512 // Monitor connection states
UpdateConnectionStates()513 void P2PTransportChannel::UpdateConnectionStates() {
514   uint32 now = talk_base::Time();
515 
516   // We need to copy the list of connections since some may delete themselves
517   // when we call UpdateState.
518   for (uint32 i = 0; i < connections_.size(); ++i)
519     connections_[i]->UpdateState(now);
520 }
521 
522 // Prepare for best candidate sorting
RequestSort()523 void P2PTransportChannel::RequestSort() {
524   if (!sort_dirty_) {
525     worker_thread_->Post(this, MSG_SORT);
526     sort_dirty_ = true;
527   }
528 }
529 
530 // Sort the available connections to find the best one.  We also monitor
531 // the number of available connections and the current state so that we
532 // can possibly kick off more allocators (for more connections).
SortConnections()533 void P2PTransportChannel::SortConnections() {
534   ASSERT(worker_thread_ == talk_base::Thread::Current());
535 
536   // Make sure the connection states are up-to-date since this affects how they
537   // will be sorted.
538   UpdateConnectionStates();
539 
540   // Any changes after this point will require a re-sort.
541   sort_dirty_ = false;
542 
543   // Get a list of the networks that we are using.
544   std::set<talk_base::Network*> networks;
545   for (uint32 i = 0; i < connections_.size(); ++i)
546     networks.insert(connections_[i]->port()->network());
547 
548   // Find the best alternative connection by sorting.  It is important to note
549   // that amongst equal preference, writable connections, this will choose the
550   // one whose estimated latency is lowest.  So it is the only one that we
551   // need to consider switching to.
552 
553   ConnectionCompare cmp;
554   std::stable_sort(connections_.begin(), connections_.end(), cmp);
555   Connection* top_connection = NULL;
556   if (connections_.size() > 0)
557     top_connection = connections_[0];
558 
559   // If necessary, switch to the new choice.
560   if (ShouldSwitch(best_connection_, top_connection))
561     SwitchBestConnectionTo(top_connection);
562 
563   // We can prune any connection for which there is a writable connection on
564   // the same network with better or equal prefences.  We leave those with
565   // better preference just in case they become writable later (at which point,
566   // we would prune out the current best connection).  We leave connections on
567   // other networks because they may not be using the same resources and they
568   // may represent very distinct paths over which we can switch.
569   std::set<talk_base::Network*>::iterator network;
570   for (network = networks.begin(); network != networks.end(); ++network) {
571     Connection* primier = GetBestConnectionOnNetwork(*network);
572     if (!primier || (primier->write_state() != Connection::STATE_WRITABLE))
573       continue;
574 
575     for (uint32 i = 0; i < connections_.size(); ++i) {
576       if ((connections_[i] != primier) &&
577           (connections_[i]->port()->network() == *network) &&
578           (CompareConnectionCandidates(primier, connections_[i]) >= 0)) {
579         connections_[i]->Prune();
580       }
581     }
582   }
583 
584   // Count the number of connections in the various states.
585 
586   int writable = 0;
587   int write_connect = 0;
588   int write_timeout = 0;
589 
590   for (uint32 i = 0; i < connections_.size(); ++i) {
591     switch (connections_[i]->write_state()) {
592     case Connection::STATE_WRITABLE:
593       ++writable;
594       break;
595     case Connection::STATE_WRITE_CONNECT:
596       ++write_connect;
597       break;
598     case Connection::STATE_WRITE_TIMEOUT:
599       ++write_timeout;
600       break;
601     default:
602       ASSERT(false);
603     }
604   }
605 
606   if (writable > 0) {
607     HandleWritable();
608   } else if (write_connect > 0) {
609     HandleNotWritable();
610   } else {
611     HandleAllTimedOut();
612   }
613 
614   // Update the state of this channel.  This method is called whenever the
615   // state of any connection changes, so this is a good place to do this.
616   UpdateChannelState();
617 
618   // Notify of connection state change
619   SignalConnectionMonitor(this);
620 }
621 
622 // Track the best connection, and let listeners know
SwitchBestConnectionTo(Connection * conn)623 void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) {
624   // Note: if conn is NULL, the previous best_connection_ has been destroyed,
625   // so don't use it.
626   // use it.
627   Connection* old_best_connection = best_connection_;
628   best_connection_ = conn;
629   if (best_connection_) {
630     if (old_best_connection) {
631       LOG_J(LS_INFO, this) << "Previous best connection: "
632                            << old_best_connection->ToString();
633     }
634     LOG_J(LS_INFO, this) << "New best connection: "
635                          << best_connection_->ToString();
636     SignalRouteChange(this, best_connection_->remote_candidate().address());
637   } else {
638     LOG_J(LS_INFO, this) << "No best connection";
639   }
640 }
641 
UpdateChannelState()642 void P2PTransportChannel::UpdateChannelState() {
643   // The Handle* functions already set the writable state.  We'll just double-
644   // check it here.
645   bool writable = ((best_connection_ != NULL)  &&
646       (best_connection_->write_state() ==
647       Connection::STATE_WRITABLE));
648   ASSERT(writable == this->writable());
649   if (writable != this->writable())
650     LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch";
651 
652   bool readable = false;
653   for (uint32 i = 0; i < connections_.size(); ++i) {
654     if (connections_[i]->read_state() == Connection::STATE_READABLE)
655       readable = true;
656   }
657   set_readable(readable);
658 }
659 
660 // We checked the status of our connections and we had at least one that
661 // was writable, go into the writable state.
HandleWritable()662 void P2PTransportChannel::HandleWritable() {
663   //
664   // One or more connections writable!
665   //
666   if (!writable()) {
667     for (uint32 i = 0; i < allocator_sessions_.size(); ++i) {
668       if (allocator_sessions_[i]->IsGettingAllPorts()) {
669         allocator_sessions_[i]->StopGetAllPorts();
670       }
671     }
672 
673     // Stop further allocations.
674     CancelPendingAllocate();
675   }
676 
677   // We're writable, obviously we aren't timed out
678   was_writable_ = true;
679   was_timed_out_ = false;
680   set_writable(true);
681 }
682 
683 // We checked the status of our connections and we didn't have any that
684 // were writable, go into the connecting state (kick off a new allocator
685 // session).
HandleNotWritable()686 void P2PTransportChannel::HandleNotWritable() {
687   //
688   // No connections are writable but not timed out!
689   //
690   if (was_writable_) {
691     // If we were writable, let's kick off an allocator session immediately
692     was_writable_ = false;
693     Allocate();
694   }
695 
696   // We were connecting, obviously not ALL timed out.
697   was_timed_out_ = false;
698   set_writable(false);
699 }
700 
701 // We checked the status of our connections and not only weren't they writable
702 // but they were also timed out, we really need a new allocator.
HandleAllTimedOut()703 void P2PTransportChannel::HandleAllTimedOut() {
704   //
705   // No connections... all are timed out!
706   //
707   if (!was_timed_out_) {
708     // We weren't timed out before, so kick off an allocator now (we'll still
709     // be in the fully timed out state until the allocator actually gives back
710     // new ports)
711     Allocate();
712   }
713 
714   // NOTE: we start was_timed_out_ in the true state so that we don't get
715   // another allocator created WHILE we are in the process of building up
716   // our first allocator.
717   was_timed_out_ = true;
718   was_writable_ = false;
719   set_writable(false);
720 }
721 
722 // If we have a best connection, return it, otherwise return top one in the
723 // list (later we will mark it best).
GetBestConnectionOnNetwork(talk_base::Network * network)724 Connection* P2PTransportChannel::GetBestConnectionOnNetwork(
725     talk_base::Network* network) {
726   // If the best connection is on this network, then it wins.
727   if (best_connection_ && (best_connection_->port()->network() == network))
728     return best_connection_;
729 
730   // Otherwise, we return the top-most in sorted order.
731   for (uint32 i = 0; i < connections_.size(); ++i) {
732     if (connections_[i]->port()->network() == network)
733       return connections_[i];
734   }
735 
736   return NULL;
737 }
738 
739 // Handle any queued up requests
OnMessage(talk_base::Message * pmsg)740 void P2PTransportChannel::OnMessage(talk_base::Message *pmsg) {
741   if (pmsg->message_id == MSG_SORT)
742     OnSort();
743   else if (pmsg->message_id == MSG_PING)
744     OnPing();
745   else if (pmsg->message_id == MSG_ALLOCATE)
746     Allocate();
747   else
748     ASSERT(false);
749 }
750 
751 // Handle queued up sort request
OnSort()752 void P2PTransportChannel::OnSort() {
753   // Resort the connections based on the new statistics.
754   SortConnections();
755 }
756 
757 // Handle queued up ping request
OnPing()758 void P2PTransportChannel::OnPing() {
759   // Make sure the states of the connections are up-to-date (since this affects
760   // which ones are pingable).
761   UpdateConnectionStates();
762 
763   // Find the oldest pingable connection and have it do a ping.
764   Connection* conn = FindNextPingableConnection();
765   if (conn)
766     conn->Ping(talk_base::Time());
767 
768   // Post ourselves a message to perform the next ping.
769   uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY;
770   thread()->PostDelayed(delay, this, MSG_PING);
771 }
772 
773 // Is the connection in a state for us to even consider pinging the other side?
IsPingable(Connection * conn)774 bool P2PTransportChannel::IsPingable(Connection* conn) {
775   // An unconnected connection cannot be written to at all, so pinging is out
776   // of the question.
777   if (!conn->connected())
778     return false;
779 
780   if (writable()) {
781     // If we are writable, then we only want to ping connections that could be
782     // better than this one, i.e., the ones that were not pruned.
783     return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT);
784   } else {
785     // If we are not writable, then we need to try everything that might work.
786     // This includes both connections that do not have write timeout as well as
787     // ones that do not have read timeout.  A connection could be readable but
788     // be in write-timeout if we pruned it before.  Since the other side is
789     // still pinging it, it very well might still work.
790     return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) ||
791            (conn->read_state() != Connection::STATE_READ_TIMEOUT);
792   }
793 }
794 
795 // Returns the next pingable connection to ping.  This will be the oldest
796 // pingable connection unless we have a writable connection that is past the
797 // maximum acceptable ping delay.
FindNextPingableConnection()798 Connection* P2PTransportChannel::FindNextPingableConnection() {
799   uint32 now = talk_base::Time();
800   if (best_connection_ &&
801       (best_connection_->write_state() == Connection::STATE_WRITABLE) &&
802       (best_connection_->last_ping_sent()
803        + MAX_CURRENT_WRITABLE_DELAY <= now)) {
804     return best_connection_;
805   }
806 
807   Connection* oldest_conn = NULL;
808   uint32 oldest_time = 0xFFFFFFFF;
809   for (uint32 i = 0; i < connections_.size(); ++i) {
810     if (IsPingable(connections_[i])) {
811       if (connections_[i]->last_ping_sent() < oldest_time) {
812         oldest_time = connections_[i]->last_ping_sent();
813         oldest_conn = connections_[i];
814       }
815     }
816   }
817   return oldest_conn;
818 }
819 
820 // return the number of "pingable" connections
NumPingableConnections()821 uint32 P2PTransportChannel::NumPingableConnections() {
822   uint32 count = 0;
823   for (uint32 i = 0; i < connections_.size(); ++i) {
824     if (IsPingable(connections_[i]))
825       count += 1;
826   }
827   return count;
828 }
829 
830 // When a connection's state changes, we need to figure out who to use as
831 // the best connection again.  It could have become usable, or become unusable.
OnConnectionStateChange(Connection * connection)832 void P2PTransportChannel::OnConnectionStateChange(Connection *connection) {
833   ASSERT(worker_thread_ == talk_base::Thread::Current());
834 
835   // We have to unroll the stack before doing this because we may be changing
836   // the state of connections while sorting.
837   RequestSort();
838 }
839 
840 // When a connection is removed, edit it out, and then update our best
841 // connection.
OnConnectionDestroyed(Connection * connection)842 void P2PTransportChannel::OnConnectionDestroyed(Connection *connection) {
843   ASSERT(worker_thread_ == talk_base::Thread::Current());
844 
845   // Note: the previous best_connection_ may be destroyed by now, so don't
846   // use it.
847 
848   // Remove this connection from the list.
849   std::vector<Connection*>::iterator iter =
850       std::find(connections_.begin(), connections_.end(), connection);
851   ASSERT(iter != connections_.end());
852   connections_.erase(iter);
853 
854   LOG_J(LS_INFO, this) << "Removed connection ("
855     << static_cast<int>(connections_.size()) << " remaining)";
856 
857   // If this is currently the best connection, then we need to pick a new one.
858   // The call to SortConnections will pick a new one.  It looks at the current
859   // best connection in order to avoid switching between fairly similar ones.
860   // Since this connection is no longer an option, we can just set best to NULL
861   // and re-choose a best assuming that there was no best connection.
862   if (best_connection_ == connection) {
863     SwitchBestConnectionTo(NULL);
864     RequestSort();
865   }
866 }
867 
868 // When a port is destroyed remove it from our list of ports to use for
869 // connection attempts.
OnPortDestroyed(Port * port)870 void P2PTransportChannel::OnPortDestroyed(Port* port) {
871   ASSERT(worker_thread_ == talk_base::Thread::Current());
872 
873   // Remove this port from the list (if we didn't drop it already).
874   std::vector<Port*>::iterator iter =
875       std::find(ports_.begin(), ports_.end(), port);
876   if (iter != ports_.end())
877     ports_.erase(iter);
878 
879   LOG(INFO) << "Removed port from p2p socket: "
880             << static_cast<int>(ports_.size()) << " remaining";
881 }
882 
883 // We data is available, let listeners know
OnReadPacket(Connection * connection,const char * data,size_t len)884 void P2PTransportChannel::OnReadPacket(Connection *connection,
885                                        const char *data, size_t len) {
886   ASSERT(worker_thread_ == talk_base::Thread::Current());
887 
888   // Let the client know of an incoming packet
889 
890   SignalReadPacket(this, data, len);
891 }
892 
893 // Set options on ourselves is simply setting options on all of our available
894 // port objects.
SetOption(talk_base::Socket::Option opt,int value)895 int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) {
896   OptionMap::iterator it = options_.find(opt);
897   if (it == options_.end()) {
898     options_.insert(std::make_pair(opt, value));
899   } else if (it->second == value) {
900     return 0;
901   } else {
902     it->second = value;
903   }
904 
905   for (uint32 i = 0; i < ports_.size(); ++i) {
906     int val = ports_[i]->SetOption(opt, value);
907     if (val < 0) {
908       // Because this also occurs deferred, probably no point in reporting an
909       // error
910       LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: "
911                    << ports_[i]->GetError();
912     }
913   }
914   return 0;
915 }
916 
917 // When the signalling channel is ready, we can really kick off the allocator
OnSignalingReady()918 void P2PTransportChannel::OnSignalingReady() {
919   if (waiting_for_signaling_) {
920     waiting_for_signaling_ = false;
921     AddAllocatorSession(allocator_->CreateSession(name(), content_type()));
922     thread()->PostDelayed(kAllocatePeriod, this, MSG_ALLOCATE);
923   }
924 }
925 
926 }  // namespace cricket
927