• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // MSVC++ requires this to be set before any other includes to get M_PI.
6 #define _USE_MATH_DEFINES
7 
8 #include "remoting/test/fake_socket_factory.h"
9 
10 #include <math.h>
11 
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/location.h"
15 #include "base/rand_util.h"
16 #include "base/single_thread_task_runner.h"
17 #include "base/thread_task_runner_handle.h"
18 #include "net/base/io_buffer.h"
19 #include "remoting/test/leaky_bucket.h"
20 #include "third_party/webrtc/base/asyncpacketsocket.h"
21 
22 namespace remoting {
23 
24 namespace {
25 
26 const int kPortRangeStart = 1024;
27 const int kPortRangeEnd = 65535;
28 
GetNormalRandom(double average,double stddev)29 double GetNormalRandom(double average, double stddev) {
30   // Based on Box-Muller transform, see
31   // http://en.wikipedia.org/wiki/Box_Muller_transform .
32   return average +
33          stddev * sqrt(-2.0 * log(1.0 - base::RandDouble())) *
34              cos(base::RandDouble() * 2.0 * M_PI);
35 }
36 
37 class FakeUdpSocket : public rtc::AsyncPacketSocket {
38  public:
39   FakeUdpSocket(FakePacketSocketFactory* factory,
40                 scoped_refptr<FakeNetworkDispatcher> dispatcher,
41                 const rtc::SocketAddress& local_address);
42   virtual ~FakeUdpSocket();
43 
44   void ReceivePacket(const rtc::SocketAddress& from,
45                      const rtc::SocketAddress& to,
46                      const scoped_refptr<net::IOBuffer>& data,
47                      int data_size);
48 
49   // rtc::AsyncPacketSocket interface.
50   virtual rtc::SocketAddress GetLocalAddress() const OVERRIDE;
51   virtual rtc::SocketAddress GetRemoteAddress() const OVERRIDE;
52   virtual int Send(const void* data, size_t data_size,
53                    const rtc::PacketOptions& options) OVERRIDE;
54   virtual int SendTo(const void* data, size_t data_size,
55                      const rtc::SocketAddress& address,
56                      const rtc::PacketOptions& options) OVERRIDE;
57   virtual int Close() OVERRIDE;
58   virtual State GetState() const OVERRIDE;
59   virtual int GetOption(rtc::Socket::Option option, int* value) OVERRIDE;
60   virtual int SetOption(rtc::Socket::Option option, int value) OVERRIDE;
61   virtual int GetError() const OVERRIDE;
62   virtual void SetError(int error) OVERRIDE;
63 
64  private:
65   FakePacketSocketFactory* factory_;
66   scoped_refptr<FakeNetworkDispatcher> dispatcher_;
67   rtc::SocketAddress local_address_;
68   State state_;
69 
70   DISALLOW_COPY_AND_ASSIGN(FakeUdpSocket);
71 };
72 
FakeUdpSocket(FakePacketSocketFactory * factory,scoped_refptr<FakeNetworkDispatcher> dispatcher,const rtc::SocketAddress & local_address)73 FakeUdpSocket::FakeUdpSocket(FakePacketSocketFactory* factory,
74                              scoped_refptr<FakeNetworkDispatcher> dispatcher,
75                              const rtc::SocketAddress& local_address)
76     : factory_(factory),
77       dispatcher_(dispatcher),
78       local_address_(local_address),
79       state_(STATE_BOUND) {
80 }
81 
~FakeUdpSocket()82 FakeUdpSocket::~FakeUdpSocket() {
83   factory_->OnSocketDestroyed(local_address_.port());
84 }
85 
ReceivePacket(const rtc::SocketAddress & from,const rtc::SocketAddress & to,const scoped_refptr<net::IOBuffer> & data,int data_size)86 void FakeUdpSocket::ReceivePacket(const rtc::SocketAddress& from,
87                                   const rtc::SocketAddress& to,
88                                   const scoped_refptr<net::IOBuffer>& data,
89                                   int data_size) {
90   SignalReadPacket(
91       this, data->data(), data_size, from, rtc::CreatePacketTime(0));
92 }
93 
GetLocalAddress() const94 rtc::SocketAddress FakeUdpSocket::GetLocalAddress() const {
95   return local_address_;
96 }
97 
GetRemoteAddress() const98 rtc::SocketAddress FakeUdpSocket::GetRemoteAddress() const {
99   NOTREACHED();
100   return rtc::SocketAddress();
101 }
102 
Send(const void * data,size_t data_size,const rtc::PacketOptions & options)103 int FakeUdpSocket::Send(const void* data, size_t data_size,
104                         const rtc::PacketOptions& options) {
105   NOTREACHED();
106   return EINVAL;
107 }
108 
SendTo(const void * data,size_t data_size,const rtc::SocketAddress & address,const rtc::PacketOptions & options)109 int FakeUdpSocket::SendTo(const void* data, size_t data_size,
110                           const rtc::SocketAddress& address,
111                           const rtc::PacketOptions& options) {
112   scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(data_size);
113   memcpy(buffer->data(), data, data_size);
114   dispatcher_->DeliverPacket(local_address_, address, buffer, data_size);
115   return data_size;
116 }
117 
Close()118 int FakeUdpSocket::Close() {
119   state_ = STATE_CLOSED;
120   return 0;
121 }
122 
GetState() const123 rtc::AsyncPacketSocket::State FakeUdpSocket::GetState() const {
124   return state_;
125 }
126 
GetOption(rtc::Socket::Option option,int * value)127 int FakeUdpSocket::GetOption(rtc::Socket::Option option, int* value) {
128   NOTIMPLEMENTED();
129   return -1;
130 }
131 
SetOption(rtc::Socket::Option option,int value)132 int FakeUdpSocket::SetOption(rtc::Socket::Option option, int value) {
133   NOTIMPLEMENTED();
134   return -1;
135 }
136 
GetError() const137 int FakeUdpSocket::GetError() const {
138   return 0;
139 }
140 
SetError(int error)141 void FakeUdpSocket::SetError(int error) {
142   NOTREACHED();
143 }
144 
145 }  // namespace
146 
PendingPacket()147 FakePacketSocketFactory::PendingPacket::PendingPacket()
148     : data_size(0) {
149 }
150 
PendingPacket(const rtc::SocketAddress & from,const rtc::SocketAddress & to,const scoped_refptr<net::IOBuffer> & data,int data_size)151 FakePacketSocketFactory::PendingPacket::PendingPacket(
152     const rtc::SocketAddress& from,
153     const rtc::SocketAddress& to,
154     const scoped_refptr<net::IOBuffer>& data,
155     int data_size)
156     : from(from), to(to), data(data), data_size(data_size) {
157 }
158 
~PendingPacket()159 FakePacketSocketFactory::PendingPacket::~PendingPacket() {
160 }
161 
FakePacketSocketFactory(FakeNetworkDispatcher * dispatcher)162 FakePacketSocketFactory::FakePacketSocketFactory(
163     FakeNetworkDispatcher* dispatcher)
164     : task_runner_(base::ThreadTaskRunnerHandle::Get()),
165       dispatcher_(dispatcher),
166       address_(dispatcher_->AllocateAddress()),
167       out_of_order_rate_(0.0),
168       next_port_(kPortRangeStart),
169       weak_factory_(this) {
170   dispatcher_->AddNode(this);
171 }
172 
~FakePacketSocketFactory()173 FakePacketSocketFactory::~FakePacketSocketFactory() {
174   CHECK(udp_sockets_.empty());
175   dispatcher_->RemoveNode(this);
176 }
177 
OnSocketDestroyed(int port)178 void FakePacketSocketFactory::OnSocketDestroyed(int port) {
179   DCHECK(task_runner_->BelongsToCurrentThread());
180   udp_sockets_.erase(port);
181 }
182 
SetBandwidth(int bandwidth,int max_buffer)183 void FakePacketSocketFactory::SetBandwidth(int bandwidth, int max_buffer) {
184   DCHECK(task_runner_->BelongsToCurrentThread());
185   if (bandwidth <= 0) {
186     leaky_bucket_.reset();
187   } else {
188     leaky_bucket_.reset(new LeakyBucket(max_buffer, bandwidth));
189   }
190 }
191 
SetLatency(base::TimeDelta average,base::TimeDelta stddev)192 void FakePacketSocketFactory::SetLatency(base::TimeDelta average,
193                                          base::TimeDelta stddev) {
194   DCHECK(task_runner_->BelongsToCurrentThread());
195   latency_average_ = average;
196   latency_stddev_ = stddev;
197 }
198 
CreateUdpSocket(const rtc::SocketAddress & local_address,int min_port,int max_port)199 rtc::AsyncPacketSocket* FakePacketSocketFactory::CreateUdpSocket(
200     const rtc::SocketAddress& local_address,
201     int min_port, int max_port) {
202   DCHECK(task_runner_->BelongsToCurrentThread());
203 
204   int port = -1;
205   if (min_port > 0 && max_port > 0) {
206     for (int i = min_port; i <= max_port; ++i) {
207       if (udp_sockets_.find(i) == udp_sockets_.end()) {
208         port = i;
209         break;
210       }
211     }
212     if (port < 0)
213       return NULL;
214   } else {
215     do {
216       port = next_port_;
217       next_port_ =
218           (next_port_ >= kPortRangeEnd) ? kPortRangeStart : (next_port_ + 1);
219     } while (udp_sockets_.find(port) != udp_sockets_.end());
220   }
221 
222   CHECK(local_address.ipaddr() == address_);
223 
224   FakeUdpSocket* result =
225       new FakeUdpSocket(this, dispatcher_,
226                         rtc::SocketAddress(local_address.ipaddr(), port));
227 
228   udp_sockets_[port] =
229       base::Bind(&FakeUdpSocket::ReceivePacket, base::Unretained(result));
230 
231   return result;
232 }
233 
CreateServerTcpSocket(const rtc::SocketAddress & local_address,int min_port,int max_port,int opts)234 rtc::AsyncPacketSocket* FakePacketSocketFactory::CreateServerTcpSocket(
235     const rtc::SocketAddress& local_address,
236     int min_port, int max_port,
237     int opts) {
238   return NULL;
239 }
240 
CreateClientTcpSocket(const rtc::SocketAddress & local_address,const rtc::SocketAddress & remote_address,const rtc::ProxyInfo & proxy_info,const std::string & user_agent,int opts)241 rtc::AsyncPacketSocket* FakePacketSocketFactory::CreateClientTcpSocket(
242     const rtc::SocketAddress& local_address,
243     const rtc::SocketAddress& remote_address,
244     const rtc::ProxyInfo& proxy_info,
245     const std::string& user_agent,
246     int opts) {
247   return NULL;
248 }
249 
250 rtc::AsyncResolverInterface*
CreateAsyncResolver()251 FakePacketSocketFactory::CreateAsyncResolver() {
252   return NULL;
253 }
254 
255 const scoped_refptr<base::SingleThreadTaskRunner>&
GetThread() const256 FakePacketSocketFactory::GetThread() const {
257   return task_runner_;
258 }
259 
GetAddress() const260 const rtc::IPAddress& FakePacketSocketFactory::GetAddress() const {
261   return address_;
262 }
263 
ReceivePacket(const rtc::SocketAddress & from,const rtc::SocketAddress & to,const scoped_refptr<net::IOBuffer> & data,int data_size)264 void FakePacketSocketFactory::ReceivePacket(
265     const rtc::SocketAddress& from,
266     const rtc::SocketAddress& to,
267     const scoped_refptr<net::IOBuffer>& data,
268     int data_size) {
269   DCHECK(task_runner_->BelongsToCurrentThread());
270   DCHECK(to.ipaddr() == address_);
271 
272   base::TimeDelta delay;
273 
274   if (leaky_bucket_) {
275     delay = leaky_bucket_->AddPacket(data_size);
276     if (delay.is_max()) {
277       // Drop the packet.
278       return;
279     }
280   }
281 
282   if (latency_average_ > base::TimeDelta()) {
283     delay += base::TimeDelta::FromMillisecondsD(
284         GetNormalRandom(latency_average_.InMillisecondsF(),
285                         latency_stddev_.InMillisecondsF()));
286   }
287   if (delay < base::TimeDelta())
288     delay = base::TimeDelta();
289 
290   // Put the packet to the |pending_packets_| and post a task for
291   // DoReceivePackets(). Note that the DoReceivePackets() task posted here may
292   // deliver a different packet, not the one added to the queue here. This
293   // would happen if another task gets posted with a shorted delay or when
294   // |out_of_order_rate_| is greater than 0. It's implemented this way to
295   // decouple latency variability from out-of-order delivery.
296   PendingPacket packet(from, to, data, data_size);
297   pending_packets_.push_back(packet);
298   task_runner_->PostDelayedTask(
299       FROM_HERE,
300       base::Bind(&FakePacketSocketFactory::DoReceivePacket,
301                  weak_factory_.GetWeakPtr()),
302       delay);
303 }
304 
DoReceivePacket()305 void FakePacketSocketFactory::DoReceivePacket() {
306   DCHECK(task_runner_->BelongsToCurrentThread());
307 
308   PendingPacket packet;
309   if (pending_packets_.size() > 1 && base::RandDouble() < out_of_order_rate_) {
310     std::list<PendingPacket>::iterator it = pending_packets_.begin();
311     ++it;
312     packet = *it;
313     pending_packets_.erase(it);
314   } else {
315     packet = pending_packets_.front();
316     pending_packets_.pop_front();
317   }
318 
319   UdpSocketsMap::iterator iter = udp_sockets_.find(packet.to.port());
320   if (iter == udp_sockets_.end()) {
321     // Invalid port number.
322     return;
323   }
324 
325   iter->second.Run(packet.from, packet.to, packet.data, packet.data_size);
326 }
327 
328 }  // namespace remoting
329