1 /*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include "talk/p2p/base/port.h"
29
30 #include <algorithm>
31 #include <vector>
32
33 #include "talk/base/helpers.h"
34 #include "talk/base/logging.h"
35 #include "talk/base/scoped_ptr.h"
36 #include "talk/base/stringutils.h"
37 #include "talk/p2p/base/common.h"
38
39 namespace {
40
41 // The length of time we wait before timing out readability on a connection.
42 const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000; // 30 seconds
43
44 // The length of time we wait before timing out writability on a connection.
45 const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000; // 15 seconds
46
47 // The length of time we wait before we become unwritable.
48 const uint32 CONNECTION_WRITE_CONNECT_TIMEOUT = 5 * 1000; // 5 seconds
49
50 // The number of pings that must fail to respond before we become unwritable.
51 const uint32 CONNECTION_WRITE_CONNECT_FAILURES = 5;
52
53 // This is the length of time that we wait for a ping response to come back.
54 const int CONNECTION_RESPONSE_TIMEOUT = 5 * 1000; // 5 seconds
55
56 // Determines whether we have seen at least the given maximum number of
57 // pings fail to have a response.
TooManyFailures(const std::vector<uint32> & pings_since_last_response,uint32 maximum_failures,uint32 rtt_estimate,uint32 now)58 inline bool TooManyFailures(
59 const std::vector<uint32>& pings_since_last_response,
60 uint32 maximum_failures,
61 uint32 rtt_estimate,
62 uint32 now) {
63
64 // If we haven't sent that many pings, then we can't have failed that many.
65 if (pings_since_last_response.size() < maximum_failures)
66 return false;
67
68 // Check if the window in which we would expect a response to the ping has
69 // already elapsed.
70 return pings_since_last_response[maximum_failures - 1] + rtt_estimate < now;
71 }
72
73 // Determines whether we have gone too long without seeing any response.
TooLongWithoutResponse(const std::vector<uint32> & pings_since_last_response,uint32 maximum_time,uint32 now)74 inline bool TooLongWithoutResponse(
75 const std::vector<uint32>& pings_since_last_response,
76 uint32 maximum_time,
77 uint32 now) {
78
79 if (pings_since_last_response.size() == 0)
80 return false;
81
82 return pings_since_last_response[0] + maximum_time < now;
83 }
84
85 // We will restrict RTT estimates (when used for determining state) to be
86 // within a reasonable range.
87 const uint32 MINIMUM_RTT = 100; // 0.1 seconds
88 const uint32 MAXIMUM_RTT = 3000; // 3 seconds
89
90 // When we don't have any RTT data, we have to pick something reasonable. We
91 // use a large value just in case the connection is really slow.
92 const uint32 DEFAULT_RTT = MAXIMUM_RTT;
93
94 // Computes our estimate of the RTT given the current estimate.
ConservativeRTTEstimate(uint32 rtt)95 inline uint32 ConservativeRTTEstimate(uint32 rtt) {
96 return talk_base::_max(MINIMUM_RTT, talk_base::_min(MAXIMUM_RTT, 2 * rtt));
97 }
98
99 // Weighting of the old rtt value to new data.
100 const int RTT_RATIO = 3; // 3 : 1
101
102 // The delay before we begin checking if this port is useless.
103 const int kPortTimeoutDelay = 30 * 1000; // 30 seconds
104
105 const uint32 MSG_CHECKTIMEOUT = 1;
106 const uint32 MSG_DELETE = 1;
107 }
108
109 namespace cricket {
110
111 static const char* const PROTO_NAMES[] = { "udp", "tcp", "ssltcp" };
112
ProtoToString(ProtocolType proto)113 const char* ProtoToString(ProtocolType proto) {
114 return PROTO_NAMES[proto];
115 }
116
StringToProto(const char * value,ProtocolType * proto)117 bool StringToProto(const char* value, ProtocolType* proto) {
118 for (size_t i = 0; i <= PROTO_LAST; ++i) {
119 if (strcmp(PROTO_NAMES[i], value) == 0) {
120 *proto = static_cast<ProtocolType>(i);
121 return true;
122 }
123 }
124 return false;
125 }
126
Port(talk_base::Thread * thread,const std::string & type,talk_base::PacketSocketFactory * factory,talk_base::Network * network,uint32 ip,int min_port,int max_port)127 Port::Port(talk_base::Thread* thread, const std::string& type,
128 talk_base::PacketSocketFactory* factory, talk_base::Network* network,
129 uint32 ip, int min_port, int max_port)
130 : thread_(thread),
131 factory_(factory),
132 type_(type),
133 network_(network),
134 ip_(ip),
135 min_port_(min_port),
136 max_port_(max_port),
137 preference_(-1),
138 lifetime_(LT_PRESTART),
139 enable_port_packets_(false) {
140 ASSERT(factory_ != NULL);
141
142 set_username_fragment(talk_base::CreateRandomString(16));
143 set_password(talk_base::CreateRandomString(16));
144 LOG_J(LS_INFO, this) << "Port created";
145 }
146
~Port()147 Port::~Port() {
148 // Delete all of the remaining connections. We copy the list up front
149 // because each deletion will cause it to be modified.
150
151 std::vector<Connection*> list;
152
153 AddressMap::iterator iter = connections_.begin();
154 while (iter != connections_.end()) {
155 list.push_back(iter->second);
156 ++iter;
157 }
158
159 for (uint32 i = 0; i < list.size(); i++)
160 delete list[i];
161 }
162
GetConnection(const talk_base::SocketAddress & remote_addr)163 Connection* Port::GetConnection(const talk_base::SocketAddress& remote_addr) {
164 AddressMap::const_iterator iter = connections_.find(remote_addr);
165 if (iter != connections_.end())
166 return iter->second;
167 else
168 return NULL;
169 }
170
AddAddress(const talk_base::SocketAddress & address,const std::string & protocol,bool final)171 void Port::AddAddress(const talk_base::SocketAddress& address,
172 const std::string& protocol,
173 bool final) {
174 Candidate c;
175 c.set_name(name_);
176 c.set_type(type_);
177 c.set_protocol(protocol);
178 c.set_address(address);
179 c.set_preference(preference_);
180 c.set_username(username_frag_);
181 c.set_password(password_);
182 c.set_network_name(network_->name());
183 c.set_generation(generation_);
184 candidates_.push_back(c);
185
186 if (final)
187 SignalAddressReady(this);
188 }
189
AddConnection(Connection * conn)190 void Port::AddConnection(Connection* conn) {
191 connections_[conn->remote_candidate().address()] = conn;
192 conn->SignalDestroyed.connect(this, &Port::OnConnectionDestroyed);
193 SignalConnectionCreated(this, conn);
194 }
195
OnReadPacket(const char * data,size_t size,const talk_base::SocketAddress & addr)196 void Port::OnReadPacket(
197 const char* data, size_t size, const talk_base::SocketAddress& addr) {
198 // If the user has enabled port packets, just hand this over.
199 if (enable_port_packets_) {
200 SignalReadPacket(this, data, size, addr);
201 return;
202 }
203
204 // If this is an authenticated STUN request, then signal unknown address and
205 // send back a proper binding response.
206 StunMessage* msg;
207 std::string remote_username;
208 if (!GetStunMessage(data, size, addr, &msg, &remote_username)) {
209 LOG_J(LS_ERROR, this) << "Received non-STUN packet from unknown address ("
210 << addr.ToString() << ")";
211 } else if (!msg) {
212 // STUN message handled already
213 } else if (msg->type() == STUN_BINDING_REQUEST) {
214 SignalUnknownAddress(this, addr, msg, remote_username);
215 } else {
216 // NOTE(tschmelcher): This is benign. It occurs if we pruned a
217 // connection for this port while it had STUN requests in flight, because
218 // we then get back responses for them, which this code correctly does not
219 // handle.
220 LOG_J(LS_ERROR, this) << "Received unexpected STUN message type ("
221 << msg->type() << ") from unknown address ("
222 << addr.ToString() << ")";
223 delete msg;
224 }
225 }
226
GetStunMessage(const char * data,size_t size,const talk_base::SocketAddress & addr,StunMessage ** out_msg,std::string * out_username)227 bool Port::GetStunMessage(const char* data, size_t size,
228 const talk_base::SocketAddress& addr,
229 StunMessage** out_msg, std::string* out_username) {
230 // NOTE: This could clearly be optimized to avoid allocating any memory.
231 // However, at the data rates we'll be looking at on the client side,
232 // this probably isn't worth worrying about.
233 ASSERT(out_msg != NULL);
234 ASSERT(out_username != NULL);
235 *out_msg = NULL;
236 out_username->clear();
237
238 // Parse the request message. If the packet is not a complete and correct
239 // STUN message, then ignore it.
240 talk_base::scoped_ptr<StunMessage> stun_msg(new StunMessage());
241 talk_base::ByteBuffer buf(data, size);
242 if (!stun_msg->Read(&buf) || (buf.Length() > 0)) {
243 return false;
244 }
245
246 // The packet must include a username that either begins or ends with our
247 // fragment. It should begin with our fragment if it is a request and it
248 // should end with our fragment if it is a response.
249 const StunByteStringAttribute* username_attr =
250 stun_msg->GetByteString(STUN_ATTR_USERNAME);
251
252 int remote_frag_len = (username_attr ? username_attr->length() : 0);
253 remote_frag_len -= static_cast<int>(username_frag_.size());
254
255 if (stun_msg->type() == STUN_BINDING_REQUEST) {
256 if (remote_frag_len < 0) {
257 // Username not present or corrupted, don't reply.
258 LOG_J(LS_ERROR, this) << "Received STUN request without username from "
259 << addr.ToString();
260 return true;
261 } else if (std::memcmp(username_attr->bytes(), username_frag_.c_str(),
262 username_frag_.size()) != 0) {
263 LOG_J(LS_ERROR, this) << "Received STUN request with bad local username "
264 << std::string(username_attr->bytes(),
265 username_attr->length()) << " from "
266 << addr.ToString();
267 SendBindingErrorResponse(stun_msg.get(), addr, STUN_ERROR_BAD_REQUEST,
268 STUN_ERROR_REASON_BAD_REQUEST);
269 return true;
270 }
271
272 out_username->assign(username_attr->bytes() + username_frag_.size(),
273 username_attr->bytes() + username_attr->length());
274 } else if ((stun_msg->type() == STUN_BINDING_RESPONSE)
275 || (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE)) {
276 if (remote_frag_len < 0) {
277 LOG_J(LS_ERROR, this) << "Received STUN response without username from "
278 << addr.ToString();
279 // Do not send error response to a response
280 return true;
281 } else if (std::memcmp(username_attr->bytes() + remote_frag_len,
282 username_frag_.c_str(),
283 username_frag_.size()) != 0) {
284 LOG_J(LS_ERROR, this) << "Received STUN response with bad local username "
285 << std::string(username_attr->bytes(),
286 username_attr->length()) << " from "
287 << addr.ToString();
288 // Do not send error response to a response
289 return true;
290 }
291
292 out_username->assign(username_attr->bytes(),
293 username_attr->bytes() + remote_frag_len);
294
295 if (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE) {
296 if (const StunErrorCodeAttribute* error_code = stun_msg->GetErrorCode()) {
297 LOG_J(LS_ERROR, this) << "Received STUN binding error:"
298 << " class="
299 << static_cast<int>(error_code->error_class())
300 << " number="
301 << static_cast<int>(error_code->number())
302 << " reason='" << error_code->reason() << "'"
303 << " from " << addr.ToString();
304 // Return message to allow error-specific processing
305 } else {
306 LOG_J(LS_ERROR, this) << "Received STUN binding error without a error "
307 << "code from " << addr.ToString();
308 // Drop corrupt message
309 return true;
310 }
311 }
312 } else {
313 LOG_J(LS_ERROR, this) << "Received STUN packet with invalid type ("
314 << stun_msg->type() << ") from " << addr.ToString();
315 return true;
316 }
317
318 // Return the STUN message found.
319 *out_msg = stun_msg.release();
320 return true;
321 }
322
SendBindingResponse(StunMessage * request,const talk_base::SocketAddress & addr)323 void Port::SendBindingResponse(StunMessage* request,
324 const talk_base::SocketAddress& addr) {
325 ASSERT(request->type() == STUN_BINDING_REQUEST);
326
327 // Retrieve the username from the request.
328 const StunByteStringAttribute* username_attr =
329 request->GetByteString(STUN_ATTR_USERNAME);
330 ASSERT(username_attr != NULL);
331 if (username_attr == NULL) {
332 // No valid username, skip the response.
333 return;
334 }
335
336 // Fill in the response message.
337 StunMessage response;
338 response.SetType(STUN_BINDING_RESPONSE);
339 response.SetTransactionID(request->transaction_id());
340
341 StunByteStringAttribute* username2_attr =
342 StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
343 username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
344 response.AddAttribute(username2_attr);
345
346 StunAddressAttribute* addr_attr =
347 StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS);
348 addr_attr->SetFamily(1);
349 addr_attr->SetPort(addr.port());
350 addr_attr->SetIP(addr.ip());
351 response.AddAttribute(addr_attr);
352
353 // Send the response message.
354 // NOTE: If we wanted to, this is where we would add the HMAC.
355 talk_base::ByteBuffer buf;
356 response.Write(&buf);
357 if (SendTo(buf.Data(), buf.Length(), addr, false) < 0) {
358 LOG_J(LS_ERROR, this) << "Failed to send STUN ping response to "
359 << addr.ToString();
360 }
361
362 // The fact that we received a successful request means that this connection
363 // (if one exists) should now be readable.
364 Connection* conn = GetConnection(addr);
365 ASSERT(conn != NULL);
366 if (conn)
367 conn->ReceivedPing();
368 }
369
SendBindingErrorResponse(StunMessage * request,const talk_base::SocketAddress & addr,int error_code,const std::string & reason)370 void Port::SendBindingErrorResponse(StunMessage* request,
371 const talk_base::SocketAddress& addr,
372 int error_code, const std::string& reason) {
373 ASSERT(request->type() == STUN_BINDING_REQUEST);
374
375 // Retrieve the username from the request. If it didn't have one, we
376 // shouldn't be responding at all.
377 const StunByteStringAttribute* username_attr =
378 request->GetByteString(STUN_ATTR_USERNAME);
379 ASSERT(username_attr != NULL);
380 if (username_attr == NULL) {
381 // No valid username, skip the response.
382 return;
383 }
384
385 // Fill in the response message.
386 StunMessage response;
387 response.SetType(STUN_BINDING_ERROR_RESPONSE);
388 response.SetTransactionID(request->transaction_id());
389
390 StunByteStringAttribute* username2_attr =
391 StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
392 username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
393 response.AddAttribute(username2_attr);
394
395 StunErrorCodeAttribute* error_attr = StunAttribute::CreateErrorCode();
396 error_attr->SetErrorCode(error_code);
397 error_attr->SetReason(reason);
398 response.AddAttribute(error_attr);
399
400 // Send the response message.
401 // NOTE: If we wanted to, this is where we would add the HMAC.
402 talk_base::ByteBuffer buf;
403 response.Write(&buf);
404 SendTo(buf.Data(), buf.Length(), addr, false);
405 LOG_J(LS_INFO, this) << "Sending STUN binding error: reason=" << reason
406 << " to " << addr.ToString();
407 }
408
OnMessage(talk_base::Message * pmsg)409 void Port::OnMessage(talk_base::Message *pmsg) {
410 ASSERT(pmsg->message_id == MSG_CHECKTIMEOUT);
411 ASSERT(lifetime_ == LT_PRETIMEOUT);
412 lifetime_ = LT_POSTTIMEOUT;
413 CheckTimeout();
414 }
415
ToString() const416 std::string Port::ToString() const {
417 std::stringstream ss;
418 ss << "Port[" << name_ << ":" << type_ << ":" << network_->ToString() << "]";
419 return ss.str();
420 }
421
EnablePortPackets()422 void Port::EnablePortPackets() {
423 enable_port_packets_ = true;
424 }
425
Start()426 void Port::Start() {
427 // The port sticks around for a minimum lifetime, after which
428 // we destroy it when it drops to zero connections.
429 if (lifetime_ == LT_PRESTART) {
430 lifetime_ = LT_PRETIMEOUT;
431 thread_->PostDelayed(kPortTimeoutDelay, this, MSG_CHECKTIMEOUT);
432 } else {
433 LOG_J(LS_WARNING, this) << "Port restart attempted";
434 }
435 }
436
OnConnectionDestroyed(Connection * conn)437 void Port::OnConnectionDestroyed(Connection* conn) {
438 AddressMap::iterator iter =
439 connections_.find(conn->remote_candidate().address());
440 ASSERT(iter != connections_.end());
441 connections_.erase(iter);
442
443 CheckTimeout();
444 }
445
Destroy()446 void Port::Destroy() {
447 ASSERT(connections_.empty());
448 LOG_J(LS_INFO, this) << "Port deleted";
449 SignalDestroyed(this);
450 delete this;
451 }
452
CheckTimeout()453 void Port::CheckTimeout() {
454 // If this port has no connections, then there's no reason to keep it around.
455 // When the connections time out (both read and write), they will delete
456 // themselves, so if we have any connections, they are either readable or
457 // writable (or still connecting).
458 if ((lifetime_ == LT_POSTTIMEOUT) && connections_.empty()) {
459 Destroy();
460 }
461 }
462
463 // A ConnectionRequest is a simple STUN ping used to determine writability.
464 class ConnectionRequest : public StunRequest {
465 public:
ConnectionRequest(Connection * connection)466 explicit ConnectionRequest(Connection* connection) : connection_(connection) {
467 }
468
~ConnectionRequest()469 virtual ~ConnectionRequest() {
470 }
471
Prepare(StunMessage * request)472 virtual void Prepare(StunMessage* request) {
473 request->SetType(STUN_BINDING_REQUEST);
474 StunByteStringAttribute* username_attr =
475 StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
476 std::string username = connection_->remote_candidate().username();
477 username.append(connection_->port()->username_fragment());
478 username_attr->CopyBytes(username.c_str(), username.size());
479 request->AddAttribute(username_attr);
480 }
481
OnResponse(StunMessage * response)482 virtual void OnResponse(StunMessage* response) {
483 connection_->OnConnectionRequestResponse(this, response);
484 }
485
OnErrorResponse(StunMessage * response)486 virtual void OnErrorResponse(StunMessage* response) {
487 connection_->OnConnectionRequestErrorResponse(this, response);
488 }
489
OnTimeout()490 virtual void OnTimeout() {
491 connection_->OnConnectionRequestTimeout(this);
492 }
493
GetNextDelay()494 virtual int GetNextDelay() {
495 // Each request is sent only once. After a single delay , the request will
496 // time out.
497 timeout_ = true;
498 return CONNECTION_RESPONSE_TIMEOUT;
499 }
500
501 private:
502 Connection* connection_;
503 };
504
505 //
506 // Connection
507 //
508
Connection(Port * port,size_t index,const Candidate & remote_candidate)509 Connection::Connection(Port* port, size_t index,
510 const Candidate& remote_candidate)
511 : port_(port), local_candidate_index_(index),
512 remote_candidate_(remote_candidate), read_state_(STATE_READ_TIMEOUT),
513 write_state_(STATE_WRITE_CONNECT), connected_(true), pruned_(false),
514 requests_(port->thread()), rtt_(DEFAULT_RTT),
515 last_ping_sent_(0), last_ping_received_(0), last_data_received_(0),
516 reported_(false) {
517 // Wire up to send stun packets
518 requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket);
519 LOG_J(LS_INFO, this) << "Connection created";
520 }
521
~Connection()522 Connection::~Connection() {
523 }
524
local_candidate() const525 const Candidate& Connection::local_candidate() const {
526 if (local_candidate_index_ < port_->candidates().size())
527 return port_->candidates()[local_candidate_index_];
528 ASSERT(false);
529 static Candidate foo;
530 return foo;
531 }
532
set_read_state(ReadState value)533 void Connection::set_read_state(ReadState value) {
534 ReadState old_value = read_state_;
535 read_state_ = value;
536 if (value != old_value) {
537 LOG_J(LS_VERBOSE, this) << "set_read_state";
538 SignalStateChange(this);
539 CheckTimeout();
540 }
541 }
542
set_write_state(WriteState value)543 void Connection::set_write_state(WriteState value) {
544 WriteState old_value = write_state_;
545 write_state_ = value;
546 if (value != old_value) {
547 LOG_J(LS_VERBOSE, this) << "set_write_state";
548 SignalStateChange(this);
549 CheckTimeout();
550 }
551 }
552
set_connected(bool value)553 void Connection::set_connected(bool value) {
554 bool old_value = connected_;
555 connected_ = value;
556 if (value != old_value) {
557 LOG_J(LS_VERBOSE, this) << "set_connected";
558 }
559 }
560
OnSendStunPacket(const void * data,size_t size,StunRequest * req)561 void Connection::OnSendStunPacket(const void* data, size_t size,
562 StunRequest* req) {
563 if (port_->SendTo(data, size, remote_candidate_.address(), false) < 0) {
564 LOG_J(LS_WARNING, this) << "Failed to send STUN ping " << req->id();
565 }
566 }
567
OnReadPacket(const char * data,size_t size)568 void Connection::OnReadPacket(const char* data, size_t size) {
569 StunMessage* msg;
570 std::string remote_username;
571 const talk_base::SocketAddress& addr(remote_candidate_.address());
572 if (!port_->GetStunMessage(data, size, addr, &msg, &remote_username)) {
573 // The packet did not parse as a valid STUN message
574
575 // If this connection is readable, then pass along the packet.
576 if (read_state_ == STATE_READABLE) {
577 // readable means data from this address is acceptable
578 // Send it on!
579
580 last_data_received_ = talk_base::Time();
581 recv_rate_tracker_.Update(size);
582 SignalReadPacket(this, data, size);
583
584 // If timed out sending writability checks, start up again
585 if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
586 set_write_state(STATE_WRITE_CONNECT);
587 } else {
588 // Not readable means the remote address hasn't sent a valid
589 // binding request yet.
590
591 LOG_J(LS_WARNING, this)
592 << "Received non-STUN packet from an unreadable connection.";
593 }
594 } else if (!msg) {
595 // The packet was STUN, but was already handled internally.
596 } else if (remote_username != remote_candidate_.username()) {
597 // The packet had the right local username, but the remote username was
598 // not the right one for the remote address.
599 if (msg->type() == STUN_BINDING_REQUEST) {
600 LOG_J(LS_ERROR, this) << "Received STUN request with bad remote username "
601 << remote_username;
602 port_->SendBindingErrorResponse(msg, addr, STUN_ERROR_BAD_REQUEST,
603 STUN_ERROR_REASON_BAD_REQUEST);
604 } else if (msg->type() == STUN_BINDING_RESPONSE ||
605 msg->type() == STUN_BINDING_ERROR_RESPONSE) {
606 LOG_J(LS_ERROR, this) << "Received STUN response with bad remote username"
607 " " << remote_username;
608 }
609 delete msg;
610 } else {
611 // The packet is STUN, with the right username.
612 // If this is a STUN request, then update the readable bit and respond.
613 // If this is a STUN response, then update the writable bit.
614
615 switch (msg->type()) {
616 case STUN_BINDING_REQUEST:
617 // Incoming, validated stun request from remote peer.
618 // This call will also set the connection readable.
619
620 port_->SendBindingResponse(msg, addr);
621
622 // If timed out sending writability checks, start up again
623 if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
624 set_write_state(STATE_WRITE_CONNECT);
625 break;
626
627 case STUN_BINDING_RESPONSE:
628 case STUN_BINDING_ERROR_RESPONSE:
629 // Response from remote peer. Does it match request sent?
630 // This doesn't just check, it makes callbacks if transaction
631 // id's match
632 requests_.CheckResponse(msg);
633 break;
634
635 default:
636 ASSERT(false);
637 break;
638 }
639
640 // Done with the message; delete
641
642 delete msg;
643 }
644 }
645
Prune()646 void Connection::Prune() {
647 if (!pruned_) {
648 LOG_J(LS_VERBOSE, this) << "Connection pruned";
649 pruned_ = true;
650 requests_.Clear();
651 set_write_state(STATE_WRITE_TIMEOUT);
652 }
653 }
654
Destroy()655 void Connection::Destroy() {
656 LOG_J(LS_VERBOSE, this) << "Connection destroyed";
657 set_read_state(STATE_READ_TIMEOUT);
658 set_write_state(STATE_WRITE_TIMEOUT);
659 }
660
UpdateState(uint32 now)661 void Connection::UpdateState(uint32 now) {
662 uint32 rtt = ConservativeRTTEstimate(rtt_);
663
664 std::string pings;
665 for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
666 char buf[32];
667 talk_base::sprintfn(buf, sizeof(buf), "%u",
668 pings_since_last_response_[i]);
669 pings.append(buf).append(" ");
670 }
671 LOG_J(LS_VERBOSE, this) << "UpdateState(): pings_since_last_response_=" <<
672 pings << ", rtt=" << rtt << ", now=" << now;
673
674 // Check the readable state.
675 //
676 // Since we don't know how many pings the other side has attempted, the best
677 // test we can do is a simple window.
678
679 if ((read_state_ == STATE_READABLE) &&
680 (last_ping_received_ + CONNECTION_READ_TIMEOUT <= now)) {
681 LOG_J(LS_INFO, this) << "Unreadable after "
682 << now - last_ping_received_
683 << " ms without a ping, rtt=" << rtt;
684 set_read_state(STATE_READ_TIMEOUT);
685 }
686
687 // Check the writable state. (The order of these checks is important.)
688 //
689 // Before becoming unwritable, we allow for a fixed number of pings to fail
690 // (i.e., receive no response). We also have to give the response time to
691 // get back, so we include a conservative estimate of this.
692 //
693 // Before timing out writability, we give a fixed amount of time. This is to
694 // allow for changes in network conditions.
695
696 if ((write_state_ == STATE_WRITABLE) &&
697 TooManyFailures(pings_since_last_response_,
698 CONNECTION_WRITE_CONNECT_FAILURES,
699 rtt,
700 now) &&
701 TooLongWithoutResponse(pings_since_last_response_,
702 CONNECTION_WRITE_CONNECT_TIMEOUT,
703 now)) {
704 uint32 max_pings = CONNECTION_WRITE_CONNECT_FAILURES;
705 LOG_J(LS_INFO, this) << "Unwritable after " << max_pings
706 << " ping failures and "
707 << now - pings_since_last_response_[0]
708 << " ms without a response,"
709 << " ms since last received ping="
710 << now - last_ping_received_
711 << " ms since last received data="
712 << now - last_data_received_
713 << " rtt=" << rtt;
714 set_write_state(STATE_WRITE_CONNECT);
715 }
716
717 if ((write_state_ == STATE_WRITE_CONNECT) &&
718 TooLongWithoutResponse(pings_since_last_response_,
719 CONNECTION_WRITE_TIMEOUT,
720 now)) {
721 LOG_J(LS_INFO, this) << "Timed out after "
722 << now - pings_since_last_response_[0]
723 << " ms without a response, rtt=" << rtt;
724 set_write_state(STATE_WRITE_TIMEOUT);
725 }
726 }
727
Ping(uint32 now)728 void Connection::Ping(uint32 now) {
729 ASSERT(connected_);
730 last_ping_sent_ = now;
731 pings_since_last_response_.push_back(now);
732 ConnectionRequest *req = new ConnectionRequest(this);
733 LOG_J(LS_VERBOSE, this) << "Sending STUN ping " << req->id() << " at " << now;
734 requests_.Send(req);
735 }
736
ReceivedPing()737 void Connection::ReceivedPing() {
738 last_ping_received_ = talk_base::Time();
739 set_read_state(STATE_READABLE);
740 }
741
ToString() const742 std::string Connection::ToString() const {
743 const char CONNECT_STATE_ABBREV[2] = {
744 '-', // not connected (false)
745 'C', // connected (true)
746 };
747 const char READ_STATE_ABBREV[2] = {
748 'R', // STATE_READABLE
749 '-', // STATE_READ_TIMEOUT
750 };
751 const char WRITE_STATE_ABBREV[3] = {
752 'W', // STATE_WRITABLE
753 'w', // STATE_WRITE_CONNECT
754 '-', // STATE_WRITE_TIMEOUT
755 };
756 const Candidate& local = local_candidate();
757 const Candidate& remote = remote_candidate();
758 std::stringstream ss;
759 ss << "Conn[" << local.generation()
760 << ":" << local.name() << ":" << local.type() << ":"
761 << local.protocol() << ":" << local.address().ToString()
762 << "->" << remote.name() << ":" << remote.type() << ":"
763 << remote.protocol() << ":" << remote.address().ToString()
764 << "|"
765 << CONNECT_STATE_ABBREV[connected()]
766 << READ_STATE_ABBREV[read_state()]
767 << WRITE_STATE_ABBREV[write_state()]
768 << "|";
769 if (rtt_ < DEFAULT_RTT) {
770 ss << rtt_ << "]";
771 } else {
772 ss << "-]";
773 }
774 return ss.str();
775 }
776
OnConnectionRequestResponse(ConnectionRequest * request,StunMessage * response)777 void Connection::OnConnectionRequestResponse(ConnectionRequest* request,
778 StunMessage* response) {
779 // We've already validated that this is a STUN binding response with
780 // the correct local and remote username for this connection.
781 // So if we're not already, become writable. We may be bringing a pruned
782 // connection back to life, but if we don't really want it, we can always
783 // prune it again.
784 uint32 rtt = request->Elapsed();
785 set_write_state(STATE_WRITABLE);
786
787 std::string pings;
788 for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
789 char buf[32];
790 talk_base::sprintfn(buf, sizeof(buf), "%u",
791 pings_since_last_response_[i]);
792 pings.append(buf).append(" ");
793 }
794
795 LOG_J(LS_VERBOSE, this) << "Received STUN ping response " << request->id()
796 << ", pings_since_last_response_=" << pings
797 << ", rtt=" << rtt;
798
799 pings_since_last_response_.clear();
800 rtt_ = (RTT_RATIO * rtt_ + rtt) / (RTT_RATIO + 1);
801 }
802
OnConnectionRequestErrorResponse(ConnectionRequest * request,StunMessage * response)803 void Connection::OnConnectionRequestErrorResponse(ConnectionRequest* request,
804 StunMessage* response) {
805 const StunErrorCodeAttribute* error = response->GetErrorCode();
806 uint32 error_code = error ?
807 error->error_code() : static_cast<uint32>(STUN_ERROR_GLOBAL_FAILURE);
808
809 if ((error_code == STUN_ERROR_UNKNOWN_ATTRIBUTE)
810 || (error_code == STUN_ERROR_SERVER_ERROR)
811 || (error_code == STUN_ERROR_UNAUTHORIZED)) {
812 // Recoverable error, retry
813 } else if (error_code == STUN_ERROR_STALE_CREDENTIALS) {
814 // Race failure, retry
815 } else {
816 // This is not a valid connection.
817 LOG_J(LS_ERROR, this) << "Received STUN error response, code="
818 << error_code << "; killing connection";
819 set_write_state(STATE_WRITE_TIMEOUT);
820 }
821 }
822
OnConnectionRequestTimeout(ConnectionRequest * request)823 void Connection::OnConnectionRequestTimeout(ConnectionRequest* request) {
824 // Log at LS_INFO if we miss a ping on a writable connection.
825 talk_base::LoggingSeverity sev = (write_state_ == STATE_WRITABLE) ?
826 talk_base::LS_INFO : talk_base::LS_VERBOSE;
827 uint32 when = talk_base::Time() - request->Elapsed();
828 size_t failures;
829 for (failures = 0; failures < pings_since_last_response_.size(); ++failures) {
830 if (pings_since_last_response_[failures] > when) {
831 break;
832 }
833 }
834 LOG_JV(sev, this) << "Timing-out STUN ping " << request->id()
835 << " after " << request->Elapsed()
836 << " ms, failures=" << failures;
837 }
838
CheckTimeout()839 void Connection::CheckTimeout() {
840 // If both read and write have timed out, then this connection can contribute
841 // no more to p2p socket unless at some later date readability were to come
842 // back. However, we gave readability a long time to timeout, so at this
843 // point, it seems fair to get rid of this connection.
844 if ((read_state_ == STATE_READ_TIMEOUT) &&
845 (write_state_ == STATE_WRITE_TIMEOUT)) {
846 port_->thread()->Post(this, MSG_DELETE);
847 }
848 }
849
OnMessage(talk_base::Message * pmsg)850 void Connection::OnMessage(talk_base::Message *pmsg) {
851 ASSERT(pmsg->message_id == MSG_DELETE);
852
853 LOG_J(LS_INFO, this) << "Connection deleted";
854 SignalDestroyed(this);
855 delete this;
856 }
857
recv_bytes_second()858 size_t Connection::recv_bytes_second() {
859 return recv_rate_tracker_.units_second();
860 }
861
recv_total_bytes()862 size_t Connection::recv_total_bytes() {
863 return recv_rate_tracker_.total_units();
864 }
865
sent_bytes_second()866 size_t Connection::sent_bytes_second() {
867 return send_rate_tracker_.units_second();
868 }
869
sent_total_bytes()870 size_t Connection::sent_total_bytes() {
871 return send_rate_tracker_.total_units();
872 }
873
ProxyConnection(Port * port,size_t index,const Candidate & candidate)874 ProxyConnection::ProxyConnection(Port* port, size_t index,
875 const Candidate& candidate)
876 : Connection(port, index, candidate), error_(0) {
877 }
878
Send(const void * data,size_t size)879 int ProxyConnection::Send(const void* data, size_t size) {
880 if (write_state() != STATE_WRITABLE) {
881 error_ = EWOULDBLOCK;
882 return SOCKET_ERROR;
883 }
884 int sent = port_->SendTo(data, size, remote_candidate_.address(), true);
885 if (sent <= 0) {
886 ASSERT(sent < 0);
887 error_ = port_->GetError();
888 } else {
889 send_rate_tracker_.Update(sent);
890 }
891 return sent;
892 }
893
894 } // namespace cricket
895