1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/base/virtualsocketserver.h"
12
13 #include <errno.h>
14 #include <math.h>
15
16 #include <algorithm>
17 #include <map>
18 #include <vector>
19
20 #include "webrtc/base/common.h"
21 #include "webrtc/base/logging.h"
22 #include "webrtc/base/physicalsocketserver.h"
23 #include "webrtc/base/socketaddresspair.h"
24 #include "webrtc/base/thread.h"
25 #include "webrtc/base/timeutils.h"
26
27 namespace rtc {
28 #if defined(WEBRTC_WIN)
29 const in_addr kInitialNextIPv4 = { {0x01, 0, 0, 0} };
30 #else
31 // This value is entirely arbitrary, hence the lack of concern about endianness.
32 const in_addr kInitialNextIPv4 = { 0x01000000 };
33 #endif
34 // Starts at ::2 so as to not cause confusion with ::1.
35 const in6_addr kInitialNextIPv6 = { { {
36 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2
37 } } };
38
39 const uint16 kFirstEphemeralPort = 49152;
40 const uint16 kLastEphemeralPort = 65535;
41 const uint16 kEphemeralPortCount = kLastEphemeralPort - kFirstEphemeralPort + 1;
42 const uint32 kDefaultNetworkCapacity = 64 * 1024;
43 const uint32 kDefaultTcpBufferSize = 32 * 1024;
44
45 const uint32 UDP_HEADER_SIZE = 28; // IP + UDP headers
46 const uint32 TCP_HEADER_SIZE = 40; // IP + TCP headers
47 const uint32 TCP_MSS = 1400; // Maximum segment size
48
49 // Note: The current algorithm doesn't work for sample sizes smaller than this.
50 const int NUM_SAMPLES = 1000;
51
52 enum {
53 MSG_ID_PACKET,
54 MSG_ID_CONNECT,
55 MSG_ID_DISCONNECT,
56 };
57
58 // Packets are passed between sockets as messages. We copy the data just like
59 // the kernel does.
60 class Packet : public MessageData {
61 public:
Packet(const char * data,size_t size,const SocketAddress & from)62 Packet(const char* data, size_t size, const SocketAddress& from)
63 : size_(size), consumed_(0), from_(from) {
64 ASSERT(NULL != data);
65 data_ = new char[size_];
66 memcpy(data_, data, size_);
67 }
68
~Packet()69 virtual ~Packet() {
70 delete[] data_;
71 }
72
data() const73 const char* data() const { return data_ + consumed_; }
size() const74 size_t size() const { return size_ - consumed_; }
from() const75 const SocketAddress& from() const { return from_; }
76
77 // Remove the first size bytes from the data.
Consume(size_t size)78 void Consume(size_t size) {
79 ASSERT(size + consumed_ < size_);
80 consumed_ += size;
81 }
82
83 private:
84 char* data_;
85 size_t size_, consumed_;
86 SocketAddress from_;
87 };
88
89 struct MessageAddress : public MessageData {
MessageAddressrtc::MessageAddress90 explicit MessageAddress(const SocketAddress& a) : addr(a) { }
91 SocketAddress addr;
92 };
93
94 // Implements the socket interface using the virtual network. Packets are
95 // passed as messages using the message queue of the socket server.
96 class VirtualSocket : public AsyncSocket, public MessageHandler {
97 public:
VirtualSocket(VirtualSocketServer * server,int family,int type,bool async)98 VirtualSocket(VirtualSocketServer* server, int family, int type, bool async)
99 : server_(server), family_(family), type_(type), async_(async),
100 state_(CS_CLOSED), error_(0), listen_queue_(NULL),
101 write_enabled_(false),
102 network_size_(0), recv_buffer_size_(0), bound_(false), was_any_(false) {
103 ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
104 ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams
105 }
106
~VirtualSocket()107 virtual ~VirtualSocket() {
108 Close();
109
110 for (RecvBuffer::iterator it = recv_buffer_.begin();
111 it != recv_buffer_.end(); ++it) {
112 delete *it;
113 }
114 }
115
GetLocalAddress() const116 virtual SocketAddress GetLocalAddress() const {
117 return local_addr_;
118 }
119
GetRemoteAddress() const120 virtual SocketAddress GetRemoteAddress() const {
121 return remote_addr_;
122 }
123
124 // Used by server sockets to set the local address without binding.
SetLocalAddress(const SocketAddress & addr)125 void SetLocalAddress(const SocketAddress& addr) {
126 local_addr_ = addr;
127 }
128
Bind(const SocketAddress & addr)129 virtual int Bind(const SocketAddress& addr) {
130 if (!local_addr_.IsNil()) {
131 error_ = EINVAL;
132 return -1;
133 }
134 local_addr_ = addr;
135 int result = server_->Bind(this, &local_addr_);
136 if (result != 0) {
137 local_addr_.Clear();
138 error_ = EADDRINUSE;
139 } else {
140 bound_ = true;
141 was_any_ = addr.IsAnyIP();
142 }
143 return result;
144 }
145
Connect(const SocketAddress & addr)146 virtual int Connect(const SocketAddress& addr) {
147 return InitiateConnect(addr, true);
148 }
149
Close()150 virtual int Close() {
151 if (!local_addr_.IsNil() && bound_) {
152 // Remove from the binding table.
153 server_->Unbind(local_addr_, this);
154 bound_ = false;
155 }
156
157 if (SOCK_STREAM == type_) {
158 // Cancel pending sockets
159 if (listen_queue_) {
160 while (!listen_queue_->empty()) {
161 SocketAddress addr = listen_queue_->front();
162
163 // Disconnect listening socket.
164 server_->Disconnect(server_->LookupBinding(addr));
165 listen_queue_->pop_front();
166 }
167 delete listen_queue_;
168 listen_queue_ = NULL;
169 }
170 // Disconnect stream sockets
171 if (CS_CONNECTED == state_) {
172 // Disconnect remote socket, check if it is a child of a server socket.
173 VirtualSocket* socket =
174 server_->LookupConnection(local_addr_, remote_addr_);
175 if (!socket) {
176 // Not a server socket child, then see if it is bound.
177 // TODO: If this is indeed a server socket that has no
178 // children this will cause the server socket to be
179 // closed. This might lead to unexpected results, how to fix this?
180 socket = server_->LookupBinding(remote_addr_);
181 }
182 server_->Disconnect(socket);
183
184 // Remove mapping for both directions.
185 server_->RemoveConnection(remote_addr_, local_addr_);
186 server_->RemoveConnection(local_addr_, remote_addr_);
187 }
188 // Cancel potential connects
189 MessageList msgs;
190 if (server_->msg_queue_) {
191 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
192 }
193 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
194 ASSERT(NULL != it->pdata);
195 MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
196
197 // Lookup remote side.
198 VirtualSocket* socket = server_->LookupConnection(local_addr_,
199 data->addr);
200 if (socket) {
201 // Server socket, remote side is a socket retreived by
202 // accept. Accepted sockets are not bound so we will not
203 // find it by looking in the bindings table.
204 server_->Disconnect(socket);
205 server_->RemoveConnection(local_addr_, data->addr);
206 } else {
207 server_->Disconnect(server_->LookupBinding(data->addr));
208 }
209 delete data;
210 }
211 // Clear incoming packets and disconnect messages
212 if (server_->msg_queue_) {
213 server_->msg_queue_->Clear(this);
214 }
215 }
216
217 state_ = CS_CLOSED;
218 local_addr_.Clear();
219 remote_addr_.Clear();
220 return 0;
221 }
222
Send(const void * pv,size_t cb)223 virtual int Send(const void *pv, size_t cb) {
224 if (CS_CONNECTED != state_) {
225 error_ = ENOTCONN;
226 return -1;
227 }
228 if (SOCK_DGRAM == type_) {
229 return SendUdp(pv, cb, remote_addr_);
230 } else {
231 return SendTcp(pv, cb);
232 }
233 }
234
SendTo(const void * pv,size_t cb,const SocketAddress & addr)235 virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
236 if (SOCK_DGRAM == type_) {
237 return SendUdp(pv, cb, addr);
238 } else {
239 if (CS_CONNECTED != state_) {
240 error_ = ENOTCONN;
241 return -1;
242 }
243 return SendTcp(pv, cb);
244 }
245 }
246
Recv(void * pv,size_t cb)247 virtual int Recv(void *pv, size_t cb) {
248 SocketAddress addr;
249 return RecvFrom(pv, cb, &addr);
250 }
251
RecvFrom(void * pv,size_t cb,SocketAddress * paddr)252 virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
253 // If we don't have a packet, then either error or wait for one to arrive.
254 if (recv_buffer_.empty()) {
255 if (async_) {
256 error_ = EAGAIN;
257 return -1;
258 }
259 while (recv_buffer_.empty()) {
260 Message msg;
261 server_->msg_queue_->Get(&msg);
262 server_->msg_queue_->Dispatch(&msg);
263 }
264 }
265
266 // Return the packet at the front of the queue.
267 Packet* packet = recv_buffer_.front();
268 size_t data_read = _min(cb, packet->size());
269 memcpy(pv, packet->data(), data_read);
270 *paddr = packet->from();
271
272 if (data_read < packet->size()) {
273 packet->Consume(data_read);
274 } else {
275 recv_buffer_.pop_front();
276 delete packet;
277 }
278
279 if (SOCK_STREAM == type_) {
280 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_);
281 recv_buffer_size_ -= data_read;
282 if (was_full) {
283 VirtualSocket* sender = server_->LookupBinding(remote_addr_);
284 ASSERT(NULL != sender);
285 server_->SendTcp(sender);
286 }
287 }
288
289 return static_cast<int>(data_read);
290 }
291
Listen(int backlog)292 virtual int Listen(int backlog) {
293 ASSERT(SOCK_STREAM == type_);
294 ASSERT(CS_CLOSED == state_);
295 if (local_addr_.IsNil()) {
296 error_ = EINVAL;
297 return -1;
298 }
299 ASSERT(NULL == listen_queue_);
300 listen_queue_ = new ListenQueue;
301 state_ = CS_CONNECTING;
302 return 0;
303 }
304
Accept(SocketAddress * paddr)305 virtual VirtualSocket* Accept(SocketAddress *paddr) {
306 if (NULL == listen_queue_) {
307 error_ = EINVAL;
308 return NULL;
309 }
310 while (!listen_queue_->empty()) {
311 VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_,
312 async_);
313
314 // Set the new local address to the same as this server socket.
315 socket->SetLocalAddress(local_addr_);
316 // Sockets made from a socket that 'was Any' need to inherit that.
317 socket->set_was_any(was_any_);
318 SocketAddress remote_addr(listen_queue_->front());
319 int result = socket->InitiateConnect(remote_addr, false);
320 listen_queue_->pop_front();
321 if (result != 0) {
322 delete socket;
323 continue;
324 }
325 socket->CompleteConnect(remote_addr, false);
326 if (paddr) {
327 *paddr = remote_addr;
328 }
329 return socket;
330 }
331 error_ = EWOULDBLOCK;
332 return NULL;
333 }
334
GetError() const335 virtual int GetError() const {
336 return error_;
337 }
338
SetError(int error)339 virtual void SetError(int error) {
340 error_ = error;
341 }
342
GetState() const343 virtual ConnState GetState() const {
344 return state_;
345 }
346
GetOption(Option opt,int * value)347 virtual int GetOption(Option opt, int* value) {
348 OptionsMap::const_iterator it = options_map_.find(opt);
349 if (it == options_map_.end()) {
350 return -1;
351 }
352 *value = it->second;
353 return 0; // 0 is success to emulate getsockopt()
354 }
355
SetOption(Option opt,int value)356 virtual int SetOption(Option opt, int value) {
357 options_map_[opt] = value;
358 return 0; // 0 is success to emulate setsockopt()
359 }
360
EstimateMTU(uint16 * mtu)361 virtual int EstimateMTU(uint16* mtu) {
362 if (CS_CONNECTED != state_)
363 return ENOTCONN;
364 else
365 return 65536;
366 }
367
OnMessage(Message * pmsg)368 void OnMessage(Message *pmsg) {
369 if (pmsg->message_id == MSG_ID_PACKET) {
370 //ASSERT(!local_addr_.IsAny());
371 ASSERT(NULL != pmsg->pdata);
372 Packet* packet = static_cast<Packet*>(pmsg->pdata);
373
374 recv_buffer_.push_back(packet);
375
376 if (async_) {
377 SignalReadEvent(this);
378 }
379 } else if (pmsg->message_id == MSG_ID_CONNECT) {
380 ASSERT(NULL != pmsg->pdata);
381 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
382 if (listen_queue_ != NULL) {
383 listen_queue_->push_back(data->addr);
384 if (async_) {
385 SignalReadEvent(this);
386 }
387 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
388 CompleteConnect(data->addr, true);
389 } else {
390 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening";
391 server_->Disconnect(server_->LookupBinding(data->addr));
392 }
393 delete data;
394 } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
395 ASSERT(SOCK_STREAM == type_);
396 if (CS_CLOSED != state_) {
397 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
398 state_ = CS_CLOSED;
399 remote_addr_.Clear();
400 if (async_) {
401 SignalCloseEvent(this, error);
402 }
403 }
404 } else {
405 ASSERT(false);
406 }
407 }
408
was_any()409 bool was_any() { return was_any_; }
set_was_any(bool was_any)410 void set_was_any(bool was_any) { was_any_ = was_any; }
411
412 private:
413 struct NetworkEntry {
414 size_t size;
415 uint32 done_time;
416 };
417
418 typedef std::deque<SocketAddress> ListenQueue;
419 typedef std::deque<NetworkEntry> NetworkQueue;
420 typedef std::vector<char> SendBuffer;
421 typedef std::list<Packet*> RecvBuffer;
422 typedef std::map<Option, int> OptionsMap;
423
InitiateConnect(const SocketAddress & addr,bool use_delay)424 int InitiateConnect(const SocketAddress& addr, bool use_delay) {
425 if (!remote_addr_.IsNil()) {
426 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS;
427 return -1;
428 }
429 if (local_addr_.IsNil()) {
430 // If there's no local address set, grab a random one in the correct AF.
431 int result = 0;
432 if (addr.ipaddr().family() == AF_INET) {
433 result = Bind(SocketAddress("0.0.0.0", 0));
434 } else if (addr.ipaddr().family() == AF_INET6) {
435 result = Bind(SocketAddress("::", 0));
436 }
437 if (result != 0) {
438 return result;
439 }
440 }
441 if (type_ == SOCK_DGRAM) {
442 remote_addr_ = addr;
443 state_ = CS_CONNECTED;
444 } else {
445 int result = server_->Connect(this, addr, use_delay);
446 if (result != 0) {
447 error_ = EHOSTUNREACH;
448 return -1;
449 }
450 state_ = CS_CONNECTING;
451 }
452 return 0;
453 }
454
CompleteConnect(const SocketAddress & addr,bool notify)455 void CompleteConnect(const SocketAddress& addr, bool notify) {
456 ASSERT(CS_CONNECTING == state_);
457 remote_addr_ = addr;
458 state_ = CS_CONNECTED;
459 server_->AddConnection(remote_addr_, local_addr_, this);
460 if (async_ && notify) {
461 SignalConnectEvent(this);
462 }
463 }
464
SendUdp(const void * pv,size_t cb,const SocketAddress & addr)465 int SendUdp(const void* pv, size_t cb, const SocketAddress& addr) {
466 // If we have not been assigned a local port, then get one.
467 if (local_addr_.IsNil()) {
468 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family());
469 int result = server_->Bind(this, &local_addr_);
470 if (result != 0) {
471 local_addr_.Clear();
472 error_ = EADDRINUSE;
473 return result;
474 }
475 }
476
477 // Send the data in a message to the appropriate socket.
478 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr);
479 }
480
SendTcp(const void * pv,size_t cb)481 int SendTcp(const void* pv, size_t cb) {
482 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
483 if (0 == capacity) {
484 write_enabled_ = true;
485 error_ = EWOULDBLOCK;
486 return -1;
487 }
488 size_t consumed = _min(cb, capacity);
489 const char* cpv = static_cast<const char*>(pv);
490 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed);
491 server_->SendTcp(this);
492 return static_cast<int>(consumed);
493 }
494
495 VirtualSocketServer* server_;
496 int family_;
497 int type_;
498 bool async_;
499 ConnState state_;
500 int error_;
501 SocketAddress local_addr_;
502 SocketAddress remote_addr_;
503
504 // Pending sockets which can be Accepted
505 ListenQueue* listen_queue_;
506
507 // Data which tcp has buffered for sending
508 SendBuffer send_buffer_;
509 bool write_enabled_;
510
511 // Critical section to protect the recv_buffer and queue_
512 CriticalSection crit_;
513
514 // Network model that enforces bandwidth and capacity constraints
515 NetworkQueue network_;
516 size_t network_size_;
517
518 // Data which has been received from the network
519 RecvBuffer recv_buffer_;
520 // The amount of data which is in flight or in recv_buffer_
521 size_t recv_buffer_size_;
522
523 // Is this socket bound?
524 bool bound_;
525
526 // When we bind a socket to Any, VSS's Bind gives it another address. For
527 // dual-stack sockets, we want to distinguish between sockets that were
528 // explicitly given a particular address and sockets that had one picked
529 // for them by VSS.
530 bool was_any_;
531
532 // Store the options that are set
533 OptionsMap options_map_;
534
535 friend class VirtualSocketServer;
536 };
537
VirtualSocketServer(SocketServer * ss)538 VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
539 : server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false),
540 network_delay_(Time()), next_ipv4_(kInitialNextIPv4),
541 next_ipv6_(kInitialNextIPv6), next_port_(kFirstEphemeralPort),
542 bindings_(new AddressMap()), connections_(new ConnectionMap()),
543 bandwidth_(0), network_capacity_(kDefaultNetworkCapacity),
544 send_buffer_capacity_(kDefaultTcpBufferSize),
545 recv_buffer_capacity_(kDefaultTcpBufferSize),
546 delay_mean_(0), delay_stddev_(0), delay_samples_(NUM_SAMPLES),
547 delay_dist_(NULL), drop_prob_(0.0) {
548 if (!server_) {
549 server_ = new PhysicalSocketServer();
550 server_owned_ = true;
551 }
552 UpdateDelayDistribution();
553 }
554
~VirtualSocketServer()555 VirtualSocketServer::~VirtualSocketServer() {
556 delete bindings_;
557 delete connections_;
558 delete delay_dist_;
559 if (server_owned_) {
560 delete server_;
561 }
562 }
563
GetNextIP(int family)564 IPAddress VirtualSocketServer::GetNextIP(int family) {
565 if (family == AF_INET) {
566 IPAddress next_ip(next_ipv4_);
567 next_ipv4_.s_addr =
568 HostToNetwork32(NetworkToHost32(next_ipv4_.s_addr) + 1);
569 return next_ip;
570 } else if (family == AF_INET6) {
571 IPAddress next_ip(next_ipv6_);
572 uint32* as_ints = reinterpret_cast<uint32*>(&next_ipv6_.s6_addr);
573 as_ints[3] += 1;
574 return next_ip;
575 }
576 return IPAddress();
577 }
578
GetNextPort()579 uint16 VirtualSocketServer::GetNextPort() {
580 uint16 port = next_port_;
581 if (next_port_ < kLastEphemeralPort) {
582 ++next_port_;
583 } else {
584 next_port_ = kFirstEphemeralPort;
585 }
586 return port;
587 }
588
CreateSocket(int type)589 Socket* VirtualSocketServer::CreateSocket(int type) {
590 return CreateSocket(AF_INET, type);
591 }
592
CreateSocket(int family,int type)593 Socket* VirtualSocketServer::CreateSocket(int family, int type) {
594 return CreateSocketInternal(family, type);
595 }
596
CreateAsyncSocket(int type)597 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {
598 return CreateAsyncSocket(AF_INET, type);
599 }
600
CreateAsyncSocket(int family,int type)601 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int family, int type) {
602 return CreateSocketInternal(family, type);
603 }
604
CreateSocketInternal(int family,int type)605 VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) {
606 return new VirtualSocket(this, family, type, true);
607 }
608
SetMessageQueue(MessageQueue * msg_queue)609 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) {
610 msg_queue_ = msg_queue;
611 if (msg_queue_) {
612 msg_queue_->SignalQueueDestroyed.connect(this,
613 &VirtualSocketServer::OnMessageQueueDestroyed);
614 }
615 }
616
Wait(int cmsWait,bool process_io)617 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
618 ASSERT(msg_queue_ == Thread::Current());
619 if (stop_on_idle_ && Thread::Current()->empty()) {
620 return false;
621 }
622 return socketserver()->Wait(cmsWait, process_io);
623 }
624
WakeUp()625 void VirtualSocketServer::WakeUp() {
626 socketserver()->WakeUp();
627 }
628
ProcessMessagesUntilIdle()629 bool VirtualSocketServer::ProcessMessagesUntilIdle() {
630 ASSERT(msg_queue_ == Thread::Current());
631 stop_on_idle_ = true;
632 while (!msg_queue_->empty()) {
633 Message msg;
634 if (msg_queue_->Get(&msg, kForever)) {
635 msg_queue_->Dispatch(&msg);
636 }
637 }
638 stop_on_idle_ = false;
639 return !msg_queue_->IsQuitting();
640 }
641
SetNextPortForTesting(uint16 port)642 void VirtualSocketServer::SetNextPortForTesting(uint16 port) {
643 next_port_ = port;
644 }
645
Bind(VirtualSocket * socket,const SocketAddress & addr)646 int VirtualSocketServer::Bind(VirtualSocket* socket,
647 const SocketAddress& addr) {
648 ASSERT(NULL != socket);
649 // Address must be completely specified at this point
650 ASSERT(!IPIsUnspec(addr.ipaddr()));
651 ASSERT(addr.port() != 0);
652
653 // Normalize the address (turns v6-mapped addresses into v4-addresses).
654 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
655
656 AddressMap::value_type entry(normalized, socket);
657 return bindings_->insert(entry).second ? 0 : -1;
658 }
659
Bind(VirtualSocket * socket,SocketAddress * addr)660 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
661 ASSERT(NULL != socket);
662
663 if (IPIsAny(addr->ipaddr())) {
664 addr->SetIP(GetNextIP(addr->ipaddr().family()));
665 } else if (!IPIsUnspec(addr->ipaddr())) {
666 addr->SetIP(addr->ipaddr().Normalized());
667 } else {
668 ASSERT(false);
669 }
670
671 if (addr->port() == 0) {
672 for (int i = 0; i < kEphemeralPortCount; ++i) {
673 addr->SetPort(GetNextPort());
674 if (bindings_->find(*addr) == bindings_->end()) {
675 break;
676 }
677 }
678 }
679
680 return Bind(socket, *addr);
681 }
682
LookupBinding(const SocketAddress & addr)683 VirtualSocket* VirtualSocketServer::LookupBinding(const SocketAddress& addr) {
684 SocketAddress normalized(addr.ipaddr().Normalized(),
685 addr.port());
686 AddressMap::iterator it = bindings_->find(normalized);
687 return (bindings_->end() != it) ? it->second : NULL;
688 }
689
Unbind(const SocketAddress & addr,VirtualSocket * socket)690 int VirtualSocketServer::Unbind(const SocketAddress& addr,
691 VirtualSocket* socket) {
692 SocketAddress normalized(addr.ipaddr().Normalized(),
693 addr.port());
694 ASSERT((*bindings_)[normalized] == socket);
695 bindings_->erase(bindings_->find(normalized));
696 return 0;
697 }
698
AddConnection(const SocketAddress & local,const SocketAddress & remote,VirtualSocket * remote_socket)699 void VirtualSocketServer::AddConnection(const SocketAddress& local,
700 const SocketAddress& remote,
701 VirtualSocket* remote_socket) {
702 // Add this socket pair to our routing table. This will allow
703 // multiple clients to connect to the same server address.
704 SocketAddress local_normalized(local.ipaddr().Normalized(),
705 local.port());
706 SocketAddress remote_normalized(remote.ipaddr().Normalized(),
707 remote.port());
708 SocketAddressPair address_pair(local_normalized, remote_normalized);
709 connections_->insert(std::pair<SocketAddressPair,
710 VirtualSocket*>(address_pair, remote_socket));
711 }
712
LookupConnection(const SocketAddress & local,const SocketAddress & remote)713 VirtualSocket* VirtualSocketServer::LookupConnection(
714 const SocketAddress& local,
715 const SocketAddress& remote) {
716 SocketAddress local_normalized(local.ipaddr().Normalized(),
717 local.port());
718 SocketAddress remote_normalized(remote.ipaddr().Normalized(),
719 remote.port());
720 SocketAddressPair address_pair(local_normalized, remote_normalized);
721 ConnectionMap::iterator it = connections_->find(address_pair);
722 return (connections_->end() != it) ? it->second : NULL;
723 }
724
RemoveConnection(const SocketAddress & local,const SocketAddress & remote)725 void VirtualSocketServer::RemoveConnection(const SocketAddress& local,
726 const SocketAddress& remote) {
727 SocketAddress local_normalized(local.ipaddr().Normalized(),
728 local.port());
729 SocketAddress remote_normalized(remote.ipaddr().Normalized(),
730 remote.port());
731 SocketAddressPair address_pair(local_normalized, remote_normalized);
732 connections_->erase(address_pair);
733 }
734
Random()735 static double Random() {
736 return static_cast<double>(rand()) / RAND_MAX;
737 }
738
Connect(VirtualSocket * socket,const SocketAddress & remote_addr,bool use_delay)739 int VirtualSocketServer::Connect(VirtualSocket* socket,
740 const SocketAddress& remote_addr,
741 bool use_delay) {
742 uint32 delay = use_delay ? GetRandomTransitDelay() : 0;
743 VirtualSocket* remote = LookupBinding(remote_addr);
744 if (!CanInteractWith(socket, remote)) {
745 LOG(LS_INFO) << "Address family mismatch between "
746 << socket->GetLocalAddress() << " and " << remote_addr;
747 return -1;
748 }
749 if (remote != NULL) {
750 SocketAddress addr = socket->GetLocalAddress();
751 msg_queue_->PostDelayed(delay, remote, MSG_ID_CONNECT,
752 new MessageAddress(addr));
753 } else {
754 LOG(LS_INFO) << "No one listening at " << remote_addr;
755 msg_queue_->PostDelayed(delay, socket, MSG_ID_DISCONNECT);
756 }
757 return 0;
758 }
759
Disconnect(VirtualSocket * socket)760 bool VirtualSocketServer::Disconnect(VirtualSocket* socket) {
761 if (socket) {
762 // Remove the mapping.
763 msg_queue_->Post(socket, MSG_ID_DISCONNECT);
764 return true;
765 }
766 return false;
767 }
768
SendUdp(VirtualSocket * socket,const char * data,size_t data_size,const SocketAddress & remote_addr)769 int VirtualSocketServer::SendUdp(VirtualSocket* socket,
770 const char* data, size_t data_size,
771 const SocketAddress& remote_addr) {
772 // See if we want to drop this packet.
773 if (Random() < drop_prob_) {
774 LOG(LS_VERBOSE) << "Dropping packet: bad luck";
775 return static_cast<int>(data_size);
776 }
777
778 VirtualSocket* recipient = LookupBinding(remote_addr);
779 if (!recipient) {
780 // Make a fake recipient for address family checking.
781 scoped_ptr<VirtualSocket> dummy_socket(
782 CreateSocketInternal(AF_INET, SOCK_DGRAM));
783 dummy_socket->SetLocalAddress(remote_addr);
784 if (!CanInteractWith(socket, dummy_socket.get())) {
785 LOG(LS_VERBOSE) << "Incompatible address families: "
786 << socket->GetLocalAddress() << " and " << remote_addr;
787 return -1;
788 }
789 LOG(LS_VERBOSE) << "No one listening at " << remote_addr;
790 return static_cast<int>(data_size);
791 }
792
793 if (!CanInteractWith(socket, recipient)) {
794 LOG(LS_VERBOSE) << "Incompatible address families: "
795 << socket->GetLocalAddress() << " and " << remote_addr;
796 return -1;
797 }
798
799 CritScope cs(&socket->crit_);
800
801 uint32 cur_time = Time();
802 PurgeNetworkPackets(socket, cur_time);
803
804 // Determine whether we have enough bandwidth to accept this packet. To do
805 // this, we need to update the send queue. Once we know it's current size,
806 // we know whether we can fit this packet.
807 //
808 // NOTE: There are better algorithms for maintaining such a queue (such as
809 // "Derivative Random Drop"); however, this algorithm is a more accurate
810 // simulation of what a normal network would do.
811
812 size_t packet_size = data_size + UDP_HEADER_SIZE;
813 if (socket->network_size_ + packet_size > network_capacity_) {
814 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
815 return static_cast<int>(data_size);
816 }
817
818 AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
819 UDP_HEADER_SIZE, false);
820
821 return static_cast<int>(data_size);
822 }
823
SendTcp(VirtualSocket * socket)824 void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
825 // TCP can't send more data than will fill up the receiver's buffer.
826 // We track the data that is in the buffer plus data in flight using the
827 // recipient's recv_buffer_size_. Anything beyond that must be stored in the
828 // sender's buffer. We will trigger the buffered data to be sent when data
829 // is read from the recv_buffer.
830
831 // Lookup the local/remote pair in the connections table.
832 VirtualSocket* recipient = LookupConnection(socket->local_addr_,
833 socket->remote_addr_);
834 if (!recipient) {
835 LOG(LS_VERBOSE) << "Sending data to no one.";
836 return;
837 }
838
839 CritScope cs(&socket->crit_);
840
841 uint32 cur_time = Time();
842 PurgeNetworkPackets(socket, cur_time);
843
844 while (true) {
845 size_t available = recv_buffer_capacity_ - recipient->recv_buffer_size_;
846 size_t max_data_size = _min<size_t>(available, TCP_MSS - TCP_HEADER_SIZE);
847 size_t data_size = _min(socket->send_buffer_.size(), max_data_size);
848 if (0 == data_size)
849 break;
850
851 AddPacketToNetwork(socket, recipient, cur_time, &socket->send_buffer_[0],
852 data_size, TCP_HEADER_SIZE, true);
853 recipient->recv_buffer_size_ += data_size;
854
855 size_t new_buffer_size = socket->send_buffer_.size() - data_size;
856 // Avoid undefined access beyond the last element of the vector.
857 // This only happens when new_buffer_size is 0.
858 if (data_size < socket->send_buffer_.size()) {
859 // memmove is required for potentially overlapping source/destination.
860 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size],
861 new_buffer_size);
862 }
863 socket->send_buffer_.resize(new_buffer_size);
864 }
865
866 if (socket->write_enabled_
867 && (socket->send_buffer_.size() < send_buffer_capacity_)) {
868 socket->write_enabled_ = false;
869 socket->SignalWriteEvent(socket);
870 }
871 }
872
AddPacketToNetwork(VirtualSocket * sender,VirtualSocket * recipient,uint32 cur_time,const char * data,size_t data_size,size_t header_size,bool ordered)873 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
874 VirtualSocket* recipient,
875 uint32 cur_time,
876 const char* data,
877 size_t data_size,
878 size_t header_size,
879 bool ordered) {
880 VirtualSocket::NetworkEntry entry;
881 entry.size = data_size + header_size;
882
883 sender->network_size_ += entry.size;
884 uint32 send_delay = SendDelay(static_cast<uint32>(sender->network_size_));
885 entry.done_time = cur_time + send_delay;
886 sender->network_.push_back(entry);
887
888 // Find the delay for crossing the many virtual hops of the network.
889 uint32 transit_delay = GetRandomTransitDelay();
890
891 // Post the packet as a message to be delivered (on our own thread)
892 Packet* p = new Packet(data, data_size, sender->local_addr_);
893 uint32 ts = TimeAfter(send_delay + transit_delay);
894 if (ordered) {
895 // Ensure that new packets arrive after previous ones
896 // TODO: consider ordering on a per-socket basis, since this
897 // introduces artifical delay.
898 ts = TimeMax(ts, network_delay_);
899 }
900 msg_queue_->PostAt(ts, recipient, MSG_ID_PACKET, p);
901 network_delay_ = TimeMax(ts, network_delay_);
902 }
903
PurgeNetworkPackets(VirtualSocket * socket,uint32 cur_time)904 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
905 uint32 cur_time) {
906 while (!socket->network_.empty() &&
907 (socket->network_.front().done_time <= cur_time)) {
908 ASSERT(socket->network_size_ >= socket->network_.front().size);
909 socket->network_size_ -= socket->network_.front().size;
910 socket->network_.pop_front();
911 }
912 }
913
SendDelay(uint32 size)914 uint32 VirtualSocketServer::SendDelay(uint32 size) {
915 if (bandwidth_ == 0)
916 return 0;
917 else
918 return 1000 * size / bandwidth_;
919 }
920
921 #if 0
922 void PrintFunction(std::vector<std::pair<double, double> >* f) {
923 return;
924 double sum = 0;
925 for (uint32 i = 0; i < f->size(); ++i) {
926 std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl;
927 sum += (*f)[i].second;
928 }
929 if (!f->empty()) {
930 const double mean = sum / f->size();
931 double sum_sq_dev = 0;
932 for (uint32 i = 0; i < f->size(); ++i) {
933 double dev = (*f)[i].second - mean;
934 sum_sq_dev += dev * dev;
935 }
936 std::cout << "Mean = " << mean << " StdDev = "
937 << sqrt(sum_sq_dev / f->size()) << std::endl;
938 }
939 }
940 #endif // <unused>
941
UpdateDelayDistribution()942 void VirtualSocketServer::UpdateDelayDistribution() {
943 Function* dist = CreateDistribution(delay_mean_, delay_stddev_,
944 delay_samples_);
945 // We take a lock just to make sure we don't leak memory.
946 {
947 CritScope cs(&delay_crit_);
948 delete delay_dist_;
949 delay_dist_ = dist;
950 }
951 }
952
953 static double PI = 4 * atan(1.0);
954
Normal(double x,double mean,double stddev)955 static double Normal(double x, double mean, double stddev) {
956 double a = (x - mean) * (x - mean) / (2 * stddev * stddev);
957 return exp(-a) / (stddev * sqrt(2 * PI));
958 }
959
960 #if 0 // static unused gives a warning
961 static double Pareto(double x, double min, double k) {
962 if (x < min)
963 return 0;
964 else
965 return k * std::pow(min, k) / std::pow(x, k+1);
966 }
967 #endif
968
CreateDistribution(uint32 mean,uint32 stddev,uint32 samples)969 VirtualSocketServer::Function* VirtualSocketServer::CreateDistribution(
970 uint32 mean, uint32 stddev, uint32 samples) {
971 Function* f = new Function();
972
973 if (0 == stddev) {
974 f->push_back(Point(mean, 1.0));
975 } else {
976 double start = 0;
977 if (mean >= 4 * static_cast<double>(stddev))
978 start = mean - 4 * static_cast<double>(stddev);
979 double end = mean + 4 * static_cast<double>(stddev);
980
981 for (uint32 i = 0; i < samples; i++) {
982 double x = start + (end - start) * i / (samples - 1);
983 double y = Normal(x, mean, stddev);
984 f->push_back(Point(x, y));
985 }
986 }
987 return Resample(Invert(Accumulate(f)), 0, 1, samples);
988 }
989
GetRandomTransitDelay()990 uint32 VirtualSocketServer::GetRandomTransitDelay() {
991 size_t index = rand() % delay_dist_->size();
992 double delay = (*delay_dist_)[index].second;
993 //LOG_F(LS_INFO) << "random[" << index << "] = " << delay;
994 return static_cast<uint32>(delay);
995 }
996
997 struct FunctionDomainCmp {
operator ()rtc::FunctionDomainCmp998 bool operator()(const VirtualSocketServer::Point& p1,
999 const VirtualSocketServer::Point& p2) {
1000 return p1.first < p2.first;
1001 }
operator ()rtc::FunctionDomainCmp1002 bool operator()(double v1, const VirtualSocketServer::Point& p2) {
1003 return v1 < p2.first;
1004 }
operator ()rtc::FunctionDomainCmp1005 bool operator()(const VirtualSocketServer::Point& p1, double v2) {
1006 return p1.first < v2;
1007 }
1008 };
1009
Accumulate(Function * f)1010 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
1011 ASSERT(f->size() >= 1);
1012 double v = 0;
1013 for (Function::size_type i = 0; i < f->size() - 1; ++i) {
1014 double dx = (*f)[i + 1].first - (*f)[i].first;
1015 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2;
1016 (*f)[i].second = v;
1017 v = v + dx * avgy;
1018 }
1019 (*f)[f->size()-1].second = v;
1020 return f;
1021 }
1022
Invert(Function * f)1023 VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) {
1024 for (Function::size_type i = 0; i < f->size(); ++i)
1025 std::swap((*f)[i].first, (*f)[i].second);
1026
1027 std::sort(f->begin(), f->end(), FunctionDomainCmp());
1028 return f;
1029 }
1030
Resample(Function * f,double x1,double x2,uint32 samples)1031 VirtualSocketServer::Function* VirtualSocketServer::Resample(
1032 Function* f, double x1, double x2, uint32 samples) {
1033 Function* g = new Function();
1034
1035 for (size_t i = 0; i < samples; i++) {
1036 double x = x1 + (x2 - x1) * i / (samples - 1);
1037 double y = Evaluate(f, x);
1038 g->push_back(Point(x, y));
1039 }
1040
1041 delete f;
1042 return g;
1043 }
1044
Evaluate(Function * f,double x)1045 double VirtualSocketServer::Evaluate(Function* f, double x) {
1046 Function::iterator iter =
1047 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());
1048 if (iter == f->begin()) {
1049 return (*f)[0].second;
1050 } else if (iter == f->end()) {
1051 ASSERT(f->size() >= 1);
1052 return (*f)[f->size() - 1].second;
1053 } else if (iter->first == x) {
1054 return iter->second;
1055 } else {
1056 double x1 = (iter - 1)->first;
1057 double y1 = (iter - 1)->second;
1058 double x2 = iter->first;
1059 double y2 = iter->second;
1060 return y1 + (y2 - y1) * (x - x1) / (x2 - x1);
1061 }
1062 }
1063
CanInteractWith(VirtualSocket * local,VirtualSocket * remote)1064 bool VirtualSocketServer::CanInteractWith(VirtualSocket* local,
1065 VirtualSocket* remote) {
1066 if (!local || !remote) {
1067 return false;
1068 }
1069 IPAddress local_ip = local->GetLocalAddress().ipaddr();
1070 IPAddress remote_ip = remote->GetLocalAddress().ipaddr();
1071 IPAddress local_normalized = local_ip.Normalized();
1072 IPAddress remote_normalized = remote_ip.Normalized();
1073 // Check if the addresses are the same family after Normalization (turns
1074 // mapped IPv6 address into IPv4 addresses).
1075 // This will stop unmapped V6 addresses from talking to mapped V6 addresses.
1076 if (local_normalized.family() == remote_normalized.family()) {
1077 return true;
1078 }
1079
1080 // If ip1 is IPv4 and ip2 is :: and ip2 is not IPV6_V6ONLY.
1081 int remote_v6_only = 0;
1082 remote->GetOption(Socket::OPT_IPV6_V6ONLY, &remote_v6_only);
1083 if (local_ip.family() == AF_INET && !remote_v6_only && IPIsAny(remote_ip)) {
1084 return true;
1085 }
1086 // Same check, backwards.
1087 int local_v6_only = 0;
1088 local->GetOption(Socket::OPT_IPV6_V6ONLY, &local_v6_only);
1089 if (remote_ip.family() == AF_INET && !local_v6_only && IPIsAny(local_ip)) {
1090 return true;
1091 }
1092
1093 // Check to see if either socket was explicitly bound to IPv6-any.
1094 // These sockets can talk with anyone.
1095 if (local_ip.family() == AF_INET6 && local->was_any()) {
1096 return true;
1097 }
1098 if (remote_ip.family() == AF_INET6 && remote->was_any()) {
1099 return true;
1100 }
1101
1102 return false;
1103 }
1104
1105 } // namespace rtc
1106