• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "talk/base/virtualsocketserver.h"
29 
30 #include <errno.h>
31 
32 #include <algorithm>
33 #include <cmath>
34 #include <map>
35 #include <vector>
36 
37 #include "talk/base/common.h"
38 #include "talk/base/logging.h"
39 #include "talk/base/physicalsocketserver.h"
40 #include "talk/base/socketaddresspair.h"
41 #include "talk/base/thread.h"
42 #include "talk/base/timeutils.h"
43 
44 namespace talk_base {
45 #ifdef WIN32
46 const in_addr kInitialNextIPv4 = { {0x01, 0, 0, 0} };
47 #else
48 // This value is entirely arbitrary, hence the lack of concern about endianness.
49 const in_addr kInitialNextIPv4 = { 0x01000000 };
50 #endif
51 // Starts at ::2 so as to not cause confusion with ::1.
52 const in6_addr kInitialNextIPv6 = { { {
53       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2
54     } } };
55 
56 const uint16 kFirstEphemeralPort = 49152;
57 const uint16 kLastEphemeralPort = 65535;
58 const uint16 kEphemeralPortCount = kLastEphemeralPort - kFirstEphemeralPort + 1;
59 const uint32 kDefaultNetworkCapacity = 64 * 1024;
60 const uint32 kDefaultTcpBufferSize = 32 * 1024;
61 
62 const uint32 UDP_HEADER_SIZE = 28;  // IP + UDP headers
63 const uint32 TCP_HEADER_SIZE = 40;  // IP + TCP headers
64 const uint32 TCP_MSS = 1400;  // Maximum segment size
65 
66 // Note: The current algorithm doesn't work for sample sizes smaller than this.
67 const int NUM_SAMPLES = 1000;
68 
69 enum {
70   MSG_ID_PACKET,
71   MSG_ID_CONNECT,
72   MSG_ID_DISCONNECT,
73 };
74 
75 // Packets are passed between sockets as messages.  We copy the data just like
76 // the kernel does.
77 class Packet : public MessageData {
78  public:
Packet(const char * data,size_t size,const SocketAddress & from)79   Packet(const char* data, size_t size, const SocketAddress& from)
80         : size_(size), consumed_(0), from_(from) {
81     ASSERT(NULL != data);
82     data_ = new char[size_];
83     std::memcpy(data_, data, size_);
84   }
85 
~Packet()86   virtual ~Packet() {
87     delete[] data_;
88   }
89 
data() const90   const char* data() const { return data_ + consumed_; }
size() const91   size_t size() const { return size_ - consumed_; }
from() const92   const SocketAddress& from() const { return from_; }
93 
94   // Remove the first size bytes from the data.
Consume(size_t size)95   void Consume(size_t size) {
96     ASSERT(size + consumed_ < size_);
97     consumed_ += size;
98   }
99 
100  private:
101   char* data_;
102   size_t size_, consumed_;
103   SocketAddress from_;
104 };
105 
106 struct MessageAddress : public MessageData {
MessageAddresstalk_base::MessageAddress107   explicit MessageAddress(const SocketAddress& a) : addr(a) { }
108   SocketAddress addr;
109 };
110 
111 // Implements the socket interface using the virtual network.  Packets are
112 // passed as messages using the message queue of the socket server.
113 class VirtualSocket : public AsyncSocket, public MessageHandler {
114  public:
VirtualSocket(VirtualSocketServer * server,int family,int type,bool async)115   VirtualSocket(VirtualSocketServer* server, int family, int type, bool async)
116       : server_(server), family_(family), type_(type), async_(async),
117         state_(CS_CLOSED), error_(0), listen_queue_(NULL),
118         write_enabled_(false),
119         network_size_(0), recv_buffer_size_(0), bound_(false), was_any_(false) {
120     ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
121     ASSERT(async_ || (type_ != SOCK_STREAM));  // We only support async streams
122   }
123 
~VirtualSocket()124   virtual ~VirtualSocket() {
125     Close();
126 
127     for (RecvBuffer::iterator it = recv_buffer_.begin();
128          it != recv_buffer_.end(); ++it) {
129       delete *it;
130     }
131   }
132 
GetLocalAddress() const133   virtual SocketAddress GetLocalAddress() const {
134     return local_addr_;
135   }
136 
GetRemoteAddress() const137   virtual SocketAddress GetRemoteAddress() const {
138     return remote_addr_;
139   }
140 
141   // Used by server sockets to set the local address without binding.
SetLocalAddress(const SocketAddress & addr)142   void SetLocalAddress(const SocketAddress& addr) {
143     local_addr_ = addr;
144   }
145 
Bind(const SocketAddress & addr)146   virtual int Bind(const SocketAddress& addr) {
147     if (!local_addr_.IsNil()) {
148       error_ = EINVAL;
149       return -1;
150     }
151     local_addr_ = addr;
152     int result = server_->Bind(this, &local_addr_);
153     if (result != 0) {
154       local_addr_.Clear();
155       error_ = EADDRINUSE;
156     } else {
157       bound_ = true;
158       was_any_ = addr.IsAnyIP();
159     }
160     return result;
161   }
162 
Connect(const SocketAddress & addr)163   virtual int Connect(const SocketAddress& addr) {
164     return InitiateConnect(addr, true);
165   }
166 
Close()167   virtual int Close() {
168     if (!local_addr_.IsNil() && bound_) {
169       // Remove from the binding table.
170       server_->Unbind(local_addr_, this);
171       bound_ = false;
172     }
173 
174     if (SOCK_STREAM == type_) {
175       // Cancel pending sockets
176       if (listen_queue_) {
177         while (!listen_queue_->empty()) {
178           SocketAddress addr = listen_queue_->front();
179 
180           // Disconnect listening socket.
181           server_->Disconnect(server_->LookupBinding(addr));
182           listen_queue_->pop_front();
183         }
184         delete listen_queue_;
185         listen_queue_ = NULL;
186       }
187       // Disconnect stream sockets
188       if (CS_CONNECTED == state_) {
189         // Disconnect remote socket, check if it is a child of a server socket.
190         VirtualSocket* socket =
191             server_->LookupConnection(local_addr_, remote_addr_);
192         if (!socket) {
193           // Not a server socket child, then see if it is bound.
194           // TODO: If this is indeed a server socket that has no
195           // children this will cause the server socket to be
196           // closed. This might lead to unexpected results, how to fix this?
197           socket = server_->LookupBinding(remote_addr_);
198         }
199         server_->Disconnect(socket);
200 
201         // Remove mapping for both directions.
202         server_->RemoveConnection(remote_addr_, local_addr_);
203         server_->RemoveConnection(local_addr_, remote_addr_);
204       }
205       // Cancel potential connects
206       MessageList msgs;
207       if (server_->msg_queue_) {
208         server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
209       }
210       for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
211         ASSERT(NULL != it->pdata);
212         MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
213 
214         // Lookup remote side.
215         VirtualSocket* socket = server_->LookupConnection(local_addr_,
216                                                           data->addr);
217         if (socket) {
218           // Server socket, remote side is a socket retreived by
219           // accept. Accepted sockets are not bound so we will not
220           // find it by looking in the bindings table.
221           server_->Disconnect(socket);
222           server_->RemoveConnection(local_addr_, data->addr);
223         } else {
224           server_->Disconnect(server_->LookupBinding(data->addr));
225         }
226         delete data;
227       }
228       // Clear incoming packets and disconnect messages
229       if (server_->msg_queue_) {
230         server_->msg_queue_->Clear(this);
231       }
232     }
233 
234     state_ = CS_CLOSED;
235     local_addr_.Clear();
236     remote_addr_.Clear();
237     return 0;
238   }
239 
Send(const void * pv,size_t cb)240   virtual int Send(const void *pv, size_t cb) {
241     if (CS_CONNECTED != state_) {
242       error_ = ENOTCONN;
243       return -1;
244     }
245     if (SOCK_DGRAM == type_) {
246       return SendUdp(pv, cb, remote_addr_);
247     } else {
248       return SendTcp(pv, cb);
249     }
250   }
251 
SendTo(const void * pv,size_t cb,const SocketAddress & addr)252   virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
253     if (SOCK_DGRAM == type_) {
254       return SendUdp(pv, cb, addr);
255     } else {
256       if (CS_CONNECTED != state_) {
257         error_ = ENOTCONN;
258         return -1;
259       }
260       return SendTcp(pv, cb);
261     }
262   }
263 
Recv(void * pv,size_t cb)264   virtual int Recv(void *pv, size_t cb) {
265     SocketAddress addr;
266     return RecvFrom(pv, cb, &addr);
267   }
268 
RecvFrom(void * pv,size_t cb,SocketAddress * paddr)269   virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
270     // If we don't have a packet, then either error or wait for one to arrive.
271     if (recv_buffer_.empty()) {
272       if (async_) {
273         error_ = EAGAIN;
274         return -1;
275       }
276       while (recv_buffer_.empty()) {
277         Message msg;
278         server_->msg_queue_->Get(&msg);
279         server_->msg_queue_->Dispatch(&msg);
280       }
281     }
282 
283     // Return the packet at the front of the queue.
284     Packet* packet = recv_buffer_.front();
285     size_t data_read = _min(cb, packet->size());
286     std::memcpy(pv, packet->data(), data_read);
287     *paddr = packet->from();
288 
289     if (data_read < packet->size()) {
290       packet->Consume(data_read);
291     } else {
292       recv_buffer_.pop_front();
293       delete packet;
294     }
295 
296     if (SOCK_STREAM == type_) {
297       bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_);
298       recv_buffer_size_ -= data_read;
299       if (was_full) {
300         VirtualSocket* sender = server_->LookupBinding(remote_addr_);
301         ASSERT(NULL != sender);
302         server_->SendTcp(sender);
303       }
304     }
305 
306     return static_cast<int>(data_read);
307   }
308 
Listen(int backlog)309   virtual int Listen(int backlog) {
310     ASSERT(SOCK_STREAM == type_);
311     ASSERT(CS_CLOSED == state_);
312     if (local_addr_.IsNil()) {
313       error_ = EINVAL;
314       return -1;
315     }
316     ASSERT(NULL == listen_queue_);
317     listen_queue_ = new ListenQueue;
318     state_ = CS_CONNECTING;
319     return 0;
320   }
321 
Accept(SocketAddress * paddr)322   virtual VirtualSocket* Accept(SocketAddress *paddr) {
323     if (NULL == listen_queue_) {
324       error_ = EINVAL;
325       return NULL;
326     }
327     while (!listen_queue_->empty()) {
328       VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_,
329                                                 async_);
330 
331       // Set the new local address to the same as this server socket.
332       socket->SetLocalAddress(local_addr_);
333       // Sockets made from a socket that 'was Any' need to inherit that.
334       socket->set_was_any(was_any_);
335       SocketAddress remote_addr(listen_queue_->front());
336       int result = socket->InitiateConnect(remote_addr, false);
337       listen_queue_->pop_front();
338       if (result != 0) {
339         delete socket;
340         continue;
341       }
342       socket->CompleteConnect(remote_addr, false);
343       if (paddr) {
344         *paddr = remote_addr;
345       }
346       return socket;
347     }
348     error_ = EWOULDBLOCK;
349     return NULL;
350   }
351 
GetError() const352   virtual int GetError() const {
353     return error_;
354   }
355 
SetError(int error)356   virtual void SetError(int error) {
357     error_ = error;
358   }
359 
GetState() const360   virtual ConnState GetState() const {
361     return state_;
362   }
363 
GetOption(Option opt,int * value)364   virtual int GetOption(Option opt, int* value) {
365     OptionsMap::const_iterator it = options_map_.find(opt);
366     if (it == options_map_.end()) {
367       return -1;
368     }
369     *value = it->second;
370     return 0;  // 0 is success to emulate getsockopt()
371   }
372 
SetOption(Option opt,int value)373   virtual int SetOption(Option opt, int value) {
374     options_map_[opt] = value;
375     return 0;  // 0 is success to emulate setsockopt()
376   }
377 
EstimateMTU(uint16 * mtu)378   virtual int EstimateMTU(uint16* mtu) {
379     if (CS_CONNECTED != state_)
380       return ENOTCONN;
381     else
382       return 65536;
383   }
384 
OnMessage(Message * pmsg)385   void OnMessage(Message *pmsg) {
386     if (pmsg->message_id == MSG_ID_PACKET) {
387       //ASSERT(!local_addr_.IsAny());
388       ASSERT(NULL != pmsg->pdata);
389       Packet* packet = static_cast<Packet*>(pmsg->pdata);
390 
391       recv_buffer_.push_back(packet);
392 
393       if (async_) {
394         SignalReadEvent(this);
395       }
396     } else if (pmsg->message_id == MSG_ID_CONNECT) {
397       ASSERT(NULL != pmsg->pdata);
398       MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
399       if (listen_queue_ != NULL) {
400         listen_queue_->push_back(data->addr);
401         if (async_) {
402           SignalReadEvent(this);
403         }
404       } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
405         CompleteConnect(data->addr, true);
406       } else {
407         LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening";
408         server_->Disconnect(server_->LookupBinding(data->addr));
409       }
410       delete data;
411     } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
412       ASSERT(SOCK_STREAM == type_);
413       if (CS_CLOSED != state_) {
414         int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
415         state_ = CS_CLOSED;
416         remote_addr_.Clear();
417         if (async_) {
418           SignalCloseEvent(this, error);
419         }
420       }
421     } else {
422       ASSERT(false);
423     }
424   }
425 
was_any()426   bool was_any() { return was_any_; }
set_was_any(bool was_any)427   void set_was_any(bool was_any) { was_any_ = was_any; }
428 
429  private:
430   struct NetworkEntry {
431     size_t size;
432     uint32 done_time;
433   };
434 
435   typedef std::deque<SocketAddress> ListenQueue;
436   typedef std::deque<NetworkEntry> NetworkQueue;
437   typedef std::vector<char> SendBuffer;
438   typedef std::list<Packet*> RecvBuffer;
439   typedef std::map<Option, int> OptionsMap;
440 
InitiateConnect(const SocketAddress & addr,bool use_delay)441   int InitiateConnect(const SocketAddress& addr, bool use_delay) {
442     if (!remote_addr_.IsNil()) {
443       error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS;
444       return -1;
445     }
446     if (local_addr_.IsNil()) {
447       // If there's no local address set, grab a random one in the correct AF.
448       int result = 0;
449       if (addr.ipaddr().family() == AF_INET) {
450         result = Bind(SocketAddress("0.0.0.0", 0));
451       } else if (addr.ipaddr().family() == AF_INET6) {
452         result = Bind(SocketAddress("::", 0));
453       }
454       if (result != 0) {
455         return result;
456       }
457     }
458     if (type_ == SOCK_DGRAM) {
459       remote_addr_ = addr;
460       state_ = CS_CONNECTED;
461     } else {
462       int result = server_->Connect(this, addr, use_delay);
463       if (result != 0) {
464         error_ = EHOSTUNREACH;
465         return -1;
466       }
467       state_ = CS_CONNECTING;
468     }
469     return 0;
470   }
471 
CompleteConnect(const SocketAddress & addr,bool notify)472   void CompleteConnect(const SocketAddress& addr, bool notify) {
473     ASSERT(CS_CONNECTING == state_);
474     remote_addr_ = addr;
475     state_ = CS_CONNECTED;
476     server_->AddConnection(remote_addr_, local_addr_, this);
477     if (async_ && notify) {
478       SignalConnectEvent(this);
479     }
480   }
481 
SendUdp(const void * pv,size_t cb,const SocketAddress & addr)482   int SendUdp(const void* pv, size_t cb, const SocketAddress& addr) {
483     // If we have not been assigned a local port, then get one.
484     if (local_addr_.IsNil()) {
485       local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family());
486       int result = server_->Bind(this, &local_addr_);
487       if (result != 0) {
488         local_addr_.Clear();
489         error_ = EADDRINUSE;
490         return result;
491       }
492     }
493 
494     // Send the data in a message to the appropriate socket.
495     return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr);
496   }
497 
SendTcp(const void * pv,size_t cb)498   int SendTcp(const void* pv, size_t cb) {
499     size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
500     if (0 == capacity) {
501       write_enabled_ = true;
502       error_ = EWOULDBLOCK;
503       return -1;
504     }
505     size_t consumed = _min(cb, capacity);
506     const char* cpv = static_cast<const char*>(pv);
507     send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed);
508     server_->SendTcp(this);
509     return static_cast<int>(consumed);
510   }
511 
512   VirtualSocketServer* server_;
513   int family_;
514   int type_;
515   bool async_;
516   ConnState state_;
517   int error_;
518   SocketAddress local_addr_;
519   SocketAddress remote_addr_;
520 
521   // Pending sockets which can be Accepted
522   ListenQueue* listen_queue_;
523 
524   // Data which tcp has buffered for sending
525   SendBuffer send_buffer_;
526   bool write_enabled_;
527 
528   // Critical section to protect the recv_buffer and queue_
529   CriticalSection crit_;
530 
531   // Network model that enforces bandwidth and capacity constraints
532   NetworkQueue network_;
533   size_t network_size_;
534 
535   // Data which has been received from the network
536   RecvBuffer recv_buffer_;
537   // The amount of data which is in flight or in recv_buffer_
538   size_t recv_buffer_size_;
539 
540   // Is this socket bound?
541   bool bound_;
542 
543   // When we bind a socket to Any, VSS's Bind gives it another address. For
544   // dual-stack sockets, we want to distinguish between sockets that were
545   // explicitly given a particular address and sockets that had one picked
546   // for them by VSS.
547   bool was_any_;
548 
549   // Store the options that are set
550   OptionsMap options_map_;
551 
552   friend class VirtualSocketServer;
553 };
554 
VirtualSocketServer(SocketServer * ss)555 VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
556     : server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false),
557       network_delay_(Time()), next_ipv4_(kInitialNextIPv4),
558       next_ipv6_(kInitialNextIPv6), next_port_(kFirstEphemeralPort),
559       bindings_(new AddressMap()), connections_(new ConnectionMap()),
560       bandwidth_(0), network_capacity_(kDefaultNetworkCapacity),
561       send_buffer_capacity_(kDefaultTcpBufferSize),
562       recv_buffer_capacity_(kDefaultTcpBufferSize),
563       delay_mean_(0), delay_stddev_(0), delay_samples_(NUM_SAMPLES),
564       delay_dist_(NULL), drop_prob_(0.0) {
565   if (!server_) {
566     server_ = new PhysicalSocketServer();
567     server_owned_ = true;
568   }
569   UpdateDelayDistribution();
570 }
571 
~VirtualSocketServer()572 VirtualSocketServer::~VirtualSocketServer() {
573   delete bindings_;
574   delete connections_;
575   delete delay_dist_;
576   if (server_owned_) {
577     delete server_;
578   }
579 }
580 
GetNextIP(int family)581 IPAddress VirtualSocketServer::GetNextIP(int family) {
582   if (family == AF_INET) {
583     IPAddress next_ip(next_ipv4_);
584     next_ipv4_.s_addr =
585         HostToNetwork32(NetworkToHost32(next_ipv4_.s_addr) + 1);
586     return next_ip;
587   } else if (family == AF_INET6) {
588     IPAddress next_ip(next_ipv6_);
589     uint32* as_ints = reinterpret_cast<uint32*>(&next_ipv6_.s6_addr);
590     as_ints[3] += 1;
591     return next_ip;
592   }
593   return IPAddress();
594 }
595 
GetNextPort()596 uint16 VirtualSocketServer::GetNextPort() {
597   uint16 port = next_port_;
598   if (next_port_ < kLastEphemeralPort) {
599     ++next_port_;
600   } else {
601     next_port_ = kFirstEphemeralPort;
602   }
603   return port;
604 }
605 
CreateSocket(int type)606 Socket* VirtualSocketServer::CreateSocket(int type) {
607   return CreateSocket(AF_INET, type);
608 }
609 
CreateSocket(int family,int type)610 Socket* VirtualSocketServer::CreateSocket(int family, int type) {
611   return CreateSocketInternal(family, type);
612 }
613 
CreateAsyncSocket(int type)614 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {
615   return CreateAsyncSocket(AF_INET, type);
616 }
617 
CreateAsyncSocket(int family,int type)618 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int family, int type) {
619   return CreateSocketInternal(family, type);
620 }
621 
CreateSocketInternal(int family,int type)622 VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) {
623   return new VirtualSocket(this, family, type, true);
624 }
625 
SetMessageQueue(MessageQueue * msg_queue)626 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) {
627   msg_queue_ = msg_queue;
628   if (msg_queue_) {
629     msg_queue_->SignalQueueDestroyed.connect(this,
630         &VirtualSocketServer::OnMessageQueueDestroyed);
631   }
632 }
633 
Wait(int cmsWait,bool process_io)634 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
635   ASSERT(msg_queue_ == Thread::Current());
636   if (stop_on_idle_ && Thread::Current()->empty()) {
637     return false;
638   }
639   return socketserver()->Wait(cmsWait, process_io);
640 }
641 
WakeUp()642 void VirtualSocketServer::WakeUp() {
643   socketserver()->WakeUp();
644 }
645 
ProcessMessagesUntilIdle()646 bool VirtualSocketServer::ProcessMessagesUntilIdle() {
647   ASSERT(msg_queue_ == Thread::Current());
648   stop_on_idle_ = true;
649   while (!msg_queue_->empty()) {
650     Message msg;
651     if (msg_queue_->Get(&msg, kForever)) {
652       msg_queue_->Dispatch(&msg);
653     }
654   }
655   stop_on_idle_ = false;
656   return !msg_queue_->IsQuitting();
657 }
658 
Bind(VirtualSocket * socket,const SocketAddress & addr)659 int VirtualSocketServer::Bind(VirtualSocket* socket,
660                               const SocketAddress& addr) {
661   ASSERT(NULL != socket);
662   // Address must be completely specified at this point
663   ASSERT(!IPIsUnspec(addr.ipaddr()));
664   ASSERT(addr.port() != 0);
665 
666   // Normalize the address (turns v6-mapped addresses into v4-addresses).
667   SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
668 
669   AddressMap::value_type entry(normalized, socket);
670   return bindings_->insert(entry).second ? 0 : -1;
671 }
672 
Bind(VirtualSocket * socket,SocketAddress * addr)673 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
674   ASSERT(NULL != socket);
675 
676   if (IPIsAny(addr->ipaddr())) {
677     addr->SetIP(GetNextIP(addr->ipaddr().family()));
678   } else if (!IPIsUnspec(addr->ipaddr())) {
679     addr->SetIP(addr->ipaddr().Normalized());
680   } else {
681     ASSERT(false);
682   }
683 
684   if (addr->port() == 0) {
685     for (int i = 0; i < kEphemeralPortCount; ++i) {
686       addr->SetPort(GetNextPort());
687       if (bindings_->find(*addr) == bindings_->end()) {
688         break;
689       }
690     }
691   }
692 
693   return Bind(socket, *addr);
694 }
695 
LookupBinding(const SocketAddress & addr)696 VirtualSocket* VirtualSocketServer::LookupBinding(const SocketAddress& addr) {
697   SocketAddress normalized(addr.ipaddr().Normalized(),
698                            addr.port());
699   AddressMap::iterator it = bindings_->find(normalized);
700   return (bindings_->end() != it) ? it->second : NULL;
701 }
702 
Unbind(const SocketAddress & addr,VirtualSocket * socket)703 int VirtualSocketServer::Unbind(const SocketAddress& addr,
704                                 VirtualSocket* socket) {
705   SocketAddress normalized(addr.ipaddr().Normalized(),
706                            addr.port());
707   ASSERT((*bindings_)[normalized] == socket);
708   bindings_->erase(bindings_->find(normalized));
709   return 0;
710 }
711 
AddConnection(const SocketAddress & local,const SocketAddress & remote,VirtualSocket * remote_socket)712 void VirtualSocketServer::AddConnection(const SocketAddress& local,
713                                         const SocketAddress& remote,
714                                         VirtualSocket* remote_socket) {
715   // Add this socket pair to our routing table. This will allow
716   // multiple clients to connect to the same server address.
717   SocketAddress local_normalized(local.ipaddr().Normalized(),
718                                  local.port());
719   SocketAddress remote_normalized(remote.ipaddr().Normalized(),
720                                   remote.port());
721   SocketAddressPair address_pair(local_normalized, remote_normalized);
722   connections_->insert(std::pair<SocketAddressPair,
723                        VirtualSocket*>(address_pair, remote_socket));
724 }
725 
LookupConnection(const SocketAddress & local,const SocketAddress & remote)726 VirtualSocket* VirtualSocketServer::LookupConnection(
727     const SocketAddress& local,
728     const SocketAddress& remote) {
729   SocketAddress local_normalized(local.ipaddr().Normalized(),
730                                  local.port());
731   SocketAddress remote_normalized(remote.ipaddr().Normalized(),
732                                   remote.port());
733   SocketAddressPair address_pair(local_normalized, remote_normalized);
734   ConnectionMap::iterator it = connections_->find(address_pair);
735   return (connections_->end() != it) ? it->second : NULL;
736 }
737 
RemoveConnection(const SocketAddress & local,const SocketAddress & remote)738 void VirtualSocketServer::RemoveConnection(const SocketAddress& local,
739                                            const SocketAddress& remote) {
740   SocketAddress local_normalized(local.ipaddr().Normalized(),
741                                 local.port());
742   SocketAddress remote_normalized(remote.ipaddr().Normalized(),
743                                  remote.port());
744   SocketAddressPair address_pair(local_normalized, remote_normalized);
745   connections_->erase(address_pair);
746 }
747 
Random()748 static double Random() {
749   return static_cast<double>(rand()) / RAND_MAX;
750 }
751 
Connect(VirtualSocket * socket,const SocketAddress & remote_addr,bool use_delay)752 int VirtualSocketServer::Connect(VirtualSocket* socket,
753                                  const SocketAddress& remote_addr,
754                                  bool use_delay) {
755   uint32 delay = use_delay ? GetRandomTransitDelay() : 0;
756   VirtualSocket* remote = LookupBinding(remote_addr);
757   if (!CanInteractWith(socket, remote)) {
758     LOG(LS_INFO) << "Address family mismatch between "
759                  << socket->GetLocalAddress() << " and " << remote_addr;
760     return -1;
761   }
762   if (remote != NULL) {
763     SocketAddress addr = socket->GetLocalAddress();
764     msg_queue_->PostDelayed(delay, remote, MSG_ID_CONNECT,
765                             new MessageAddress(addr));
766   } else {
767     LOG(LS_INFO) << "No one listening at " << remote_addr;
768     msg_queue_->PostDelayed(delay, socket, MSG_ID_DISCONNECT);
769   }
770   return 0;
771 }
772 
Disconnect(VirtualSocket * socket)773 bool VirtualSocketServer::Disconnect(VirtualSocket* socket) {
774   if (socket) {
775     // Remove the mapping.
776     msg_queue_->Post(socket, MSG_ID_DISCONNECT);
777     return true;
778   }
779   return false;
780 }
781 
SendUdp(VirtualSocket * socket,const char * data,size_t data_size,const SocketAddress & remote_addr)782 int VirtualSocketServer::SendUdp(VirtualSocket* socket,
783                                  const char* data, size_t data_size,
784                                  const SocketAddress& remote_addr) {
785   // See if we want to drop this packet.
786   if (Random() < drop_prob_) {
787     LOG(LS_VERBOSE) << "Dropping packet: bad luck";
788     return static_cast<int>(data_size);
789   }
790 
791   VirtualSocket* recipient = LookupBinding(remote_addr);
792   if (!recipient) {
793     // Make a fake recipient for address family checking.
794     scoped_ptr<VirtualSocket> dummy_socket(
795         CreateSocketInternal(AF_INET, SOCK_DGRAM));
796     dummy_socket->SetLocalAddress(remote_addr);
797     if (!CanInteractWith(socket, dummy_socket.get())) {
798       LOG(LS_VERBOSE) << "Incompatible address families: "
799                       << socket->GetLocalAddress() << " and " << remote_addr;
800       return -1;
801     }
802     LOG(LS_VERBOSE) << "No one listening at " << remote_addr;
803     return static_cast<int>(data_size);
804   }
805 
806   if (!CanInteractWith(socket, recipient)) {
807     LOG(LS_VERBOSE) << "Incompatible address families: "
808                     << socket->GetLocalAddress() << " and " << remote_addr;
809     return -1;
810   }
811 
812   CritScope cs(&socket->crit_);
813 
814   uint32 cur_time = Time();
815   PurgeNetworkPackets(socket, cur_time);
816 
817   // Determine whether we have enough bandwidth to accept this packet.  To do
818   // this, we need to update the send queue.  Once we know it's current size,
819   // we know whether we can fit this packet.
820   //
821   // NOTE: There are better algorithms for maintaining such a queue (such as
822   // "Derivative Random Drop"); however, this algorithm is a more accurate
823   // simulation of what a normal network would do.
824 
825   size_t packet_size = data_size + UDP_HEADER_SIZE;
826   if (socket->network_size_ + packet_size > network_capacity_) {
827     LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
828     return static_cast<int>(data_size);
829   }
830 
831   AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
832                      UDP_HEADER_SIZE, false);
833 
834   return static_cast<int>(data_size);
835 }
836 
SendTcp(VirtualSocket * socket)837 void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
838   // TCP can't send more data than will fill up the receiver's buffer.
839   // We track the data that is in the buffer plus data in flight using the
840   // recipient's recv_buffer_size_.  Anything beyond that must be stored in the
841   // sender's buffer.  We will trigger the buffered data to be sent when data
842   // is read from the recv_buffer.
843 
844   // Lookup the local/remote pair in the connections table.
845   VirtualSocket* recipient = LookupConnection(socket->local_addr_,
846                                               socket->remote_addr_);
847   if (!recipient) {
848     LOG(LS_VERBOSE) << "Sending data to no one.";
849     return;
850   }
851 
852   CritScope cs(&socket->crit_);
853 
854   uint32 cur_time = Time();
855   PurgeNetworkPackets(socket, cur_time);
856 
857   while (true) {
858     size_t available = recv_buffer_capacity_ - recipient->recv_buffer_size_;
859     size_t max_data_size = _min<size_t>(available, TCP_MSS - TCP_HEADER_SIZE);
860     size_t data_size = _min(socket->send_buffer_.size(), max_data_size);
861     if (0 == data_size)
862       break;
863 
864     AddPacketToNetwork(socket, recipient, cur_time, &socket->send_buffer_[0],
865                        data_size, TCP_HEADER_SIZE, true);
866     recipient->recv_buffer_size_ += data_size;
867 
868     size_t new_buffer_size = socket->send_buffer_.size() - data_size;
869     // Avoid undefined access beyond the last element of the vector.
870     // This only happens when new_buffer_size is 0.
871     if (data_size < socket->send_buffer_.size()) {
872       // memmove is required for potentially overlapping source/destination.
873       memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size],
874               new_buffer_size);
875     }
876     socket->send_buffer_.resize(new_buffer_size);
877   }
878 
879   if (socket->write_enabled_
880       && (socket->send_buffer_.size() < send_buffer_capacity_)) {
881     socket->write_enabled_ = false;
882     socket->SignalWriteEvent(socket);
883   }
884 }
885 
AddPacketToNetwork(VirtualSocket * sender,VirtualSocket * recipient,uint32 cur_time,const char * data,size_t data_size,size_t header_size,bool ordered)886 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
887                                              VirtualSocket* recipient,
888                                              uint32 cur_time,
889                                              const char* data,
890                                              size_t data_size,
891                                              size_t header_size,
892                                              bool ordered) {
893   VirtualSocket::NetworkEntry entry;
894   entry.size = data_size + header_size;
895 
896   sender->network_size_ += entry.size;
897   uint32 send_delay = SendDelay(static_cast<uint32>(sender->network_size_));
898   entry.done_time = cur_time + send_delay;
899   sender->network_.push_back(entry);
900 
901   // Find the delay for crossing the many virtual hops of the network.
902   uint32 transit_delay = GetRandomTransitDelay();
903 
904   // Post the packet as a message to be delivered (on our own thread)
905   Packet* p = new Packet(data, data_size, sender->local_addr_);
906   uint32 ts = TimeAfter(send_delay + transit_delay);
907   if (ordered) {
908     // Ensure that new packets arrive after previous ones
909     // TODO: consider ordering on a per-socket basis, since this
910     // introduces artifical delay.
911     ts = TimeMax(ts, network_delay_);
912   }
913   msg_queue_->PostAt(ts, recipient, MSG_ID_PACKET, p);
914   network_delay_ = TimeMax(ts, network_delay_);
915 }
916 
PurgeNetworkPackets(VirtualSocket * socket,uint32 cur_time)917 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
918                                               uint32 cur_time) {
919   while (!socket->network_.empty() &&
920          (socket->network_.front().done_time <= cur_time)) {
921     ASSERT(socket->network_size_ >= socket->network_.front().size);
922     socket->network_size_ -= socket->network_.front().size;
923     socket->network_.pop_front();
924   }
925 }
926 
SendDelay(uint32 size)927 uint32 VirtualSocketServer::SendDelay(uint32 size) {
928   if (bandwidth_ == 0)
929     return 0;
930   else
931     return 1000 * size / bandwidth_;
932 }
933 
934 #if 0
935 void PrintFunction(std::vector<std::pair<double, double> >* f) {
936   return;
937   double sum = 0;
938   for (uint32 i = 0; i < f->size(); ++i) {
939     std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl;
940     sum += (*f)[i].second;
941   }
942   if (!f->empty()) {
943     const double mean = sum / f->size();
944     double sum_sq_dev = 0;
945     for (uint32 i = 0; i < f->size(); ++i) {
946       double dev = (*f)[i].second - mean;
947       sum_sq_dev += dev * dev;
948     }
949     std::cout << "Mean = " << mean << " StdDev = "
950               << sqrt(sum_sq_dev / f->size()) << std::endl;
951   }
952 }
953 #endif  // <unused>
954 
UpdateDelayDistribution()955 void VirtualSocketServer::UpdateDelayDistribution() {
956   Function* dist = CreateDistribution(delay_mean_, delay_stddev_,
957                                       delay_samples_);
958   // We take a lock just to make sure we don't leak memory.
959   {
960     CritScope cs(&delay_crit_);
961     delete delay_dist_;
962     delay_dist_ = dist;
963   }
964 }
965 
966 static double PI = 4 * std::atan(1.0);
967 
Normal(double x,double mean,double stddev)968 static double Normal(double x, double mean, double stddev) {
969   double a = (x - mean) * (x - mean) / (2 * stddev * stddev);
970   return std::exp(-a) / (stddev * sqrt(2 * PI));
971 }
972 
973 #if 0  // static unused gives a warning
974 static double Pareto(double x, double min, double k) {
975   if (x < min)
976     return 0;
977   else
978     return k * std::pow(min, k) / std::pow(x, k+1);
979 }
980 #endif
981 
CreateDistribution(uint32 mean,uint32 stddev,uint32 samples)982 VirtualSocketServer::Function* VirtualSocketServer::CreateDistribution(
983     uint32 mean, uint32 stddev, uint32 samples) {
984   Function* f = new Function();
985 
986   if (0 == stddev) {
987     f->push_back(Point(mean, 1.0));
988   } else {
989     double start = 0;
990     if (mean >= 4 * static_cast<double>(stddev))
991       start = mean - 4 * static_cast<double>(stddev);
992     double end = mean + 4 * static_cast<double>(stddev);
993 
994     for (uint32 i = 0; i < samples; i++) {
995       double x = start + (end - start) * i / (samples - 1);
996       double y = Normal(x, mean, stddev);
997       f->push_back(Point(x, y));
998     }
999   }
1000   return Resample(Invert(Accumulate(f)), 0, 1, samples);
1001 }
1002 
GetRandomTransitDelay()1003 uint32 VirtualSocketServer::GetRandomTransitDelay() {
1004   size_t index = rand() % delay_dist_->size();
1005   double delay = (*delay_dist_)[index].second;
1006   //LOG_F(LS_INFO) << "random[" << index << "] = " << delay;
1007   return static_cast<uint32>(delay);
1008 }
1009 
1010 struct FunctionDomainCmp {
operator ()talk_base::FunctionDomainCmp1011   bool operator()(const VirtualSocketServer::Point& p1,
1012                    const VirtualSocketServer::Point& p2) {
1013     return p1.first < p2.first;
1014   }
operator ()talk_base::FunctionDomainCmp1015   bool operator()(double v1, const VirtualSocketServer::Point& p2) {
1016     return v1 < p2.first;
1017   }
operator ()talk_base::FunctionDomainCmp1018   bool operator()(const VirtualSocketServer::Point& p1, double v2) {
1019     return p1.first < v2;
1020   }
1021 };
1022 
Accumulate(Function * f)1023 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
1024   ASSERT(f->size() >= 1);
1025   double v = 0;
1026   for (Function::size_type i = 0; i < f->size() - 1; ++i) {
1027     double dx = (*f)[i + 1].first - (*f)[i].first;
1028     double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2;
1029     (*f)[i].second = v;
1030     v = v + dx * avgy;
1031   }
1032   (*f)[f->size()-1].second = v;
1033   return f;
1034 }
1035 
Invert(Function * f)1036 VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) {
1037   for (Function::size_type i = 0; i < f->size(); ++i)
1038     std::swap((*f)[i].first, (*f)[i].second);
1039 
1040   std::sort(f->begin(), f->end(), FunctionDomainCmp());
1041   return f;
1042 }
1043 
Resample(Function * f,double x1,double x2,uint32 samples)1044 VirtualSocketServer::Function* VirtualSocketServer::Resample(
1045     Function* f, double x1, double x2, uint32 samples) {
1046   Function* g = new Function();
1047 
1048   for (size_t i = 0; i < samples; i++) {
1049     double x = x1 + (x2 - x1) * i / (samples - 1);
1050     double y = Evaluate(f, x);
1051     g->push_back(Point(x, y));
1052   }
1053 
1054   delete f;
1055   return g;
1056 }
1057 
Evaluate(Function * f,double x)1058 double VirtualSocketServer::Evaluate(Function* f, double x) {
1059   Function::iterator iter =
1060       std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());
1061   if (iter == f->begin()) {
1062     return (*f)[0].second;
1063   } else if (iter == f->end()) {
1064     ASSERT(f->size() >= 1);
1065     return (*f)[f->size() - 1].second;
1066   } else if (iter->first == x) {
1067     return iter->second;
1068   } else {
1069     double x1 = (iter - 1)->first;
1070     double y1 = (iter - 1)->second;
1071     double x2 = iter->first;
1072     double y2 = iter->second;
1073     return y1 + (y2 - y1) * (x - x1) / (x2 - x1);
1074   }
1075 }
1076 
CanInteractWith(VirtualSocket * local,VirtualSocket * remote)1077 bool VirtualSocketServer::CanInteractWith(VirtualSocket* local,
1078                                           VirtualSocket* remote) {
1079   if (!local || !remote) {
1080     return false;
1081   }
1082   IPAddress local_ip = local->GetLocalAddress().ipaddr();
1083   IPAddress remote_ip = remote->GetLocalAddress().ipaddr();
1084   IPAddress local_normalized = local_ip.Normalized();
1085   IPAddress remote_normalized = remote_ip.Normalized();
1086   // Check if the addresses are the same family after Normalization (turns
1087   // mapped IPv6 address into IPv4 addresses).
1088   // This will stop unmapped V6 addresses from talking to mapped V6 addresses.
1089   if (local_normalized.family() == remote_normalized.family()) {
1090     return true;
1091   }
1092 
1093   // If ip1 is IPv4 and ip2 is :: and ip2 is not IPV6_V6ONLY.
1094   int remote_v6_only = 0;
1095   remote->GetOption(Socket::OPT_IPV6_V6ONLY, &remote_v6_only);
1096   if (local_ip.family() == AF_INET && !remote_v6_only && IPIsAny(remote_ip)) {
1097     return true;
1098   }
1099   // Same check, backwards.
1100   int local_v6_only = 0;
1101   local->GetOption(Socket::OPT_IPV6_V6ONLY, &local_v6_only);
1102   if (remote_ip.family() == AF_INET && !local_v6_only && IPIsAny(local_ip)) {
1103     return true;
1104   }
1105 
1106   // Check to see if either socket was explicitly bound to IPv6-any.
1107   // These sockets can talk with anyone.
1108   if (local_ip.family() == AF_INET6 && local->was_any()) {
1109     return true;
1110   }
1111   if (remote_ip.family() == AF_INET6 && remote->was_any()) {
1112     return true;
1113   }
1114 
1115   return false;
1116 }
1117 
1118 }  // namespace talk_base
1119