• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static org.junit.Assert.assertEquals;
20 
21 import io.netty.util.concurrent.DefaultThreadFactory;
22 import java.io.DataInputStream;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.net.ServerSocket;
26 import java.net.Socket;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.AfterClass;
33 import org.junit.Test;
34 import org.junit.runner.RunWith;
35 import org.junit.runners.JUnit4;
36 
37 @RunWith(JUnit4.class)
38 public class ProxyTest {
39 
40   private static ThreadPoolExecutor executor =
41       new ThreadPoolExecutor(8, 8, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
42           new DefaultThreadFactory("proxy-test-pool", true));
43 
44   private TrafficControlProxy proxy;
45   private Socket client;
46   private Server server;
47 
48   @AfterClass
stopExecutor()49   public static void stopExecutor() {
50     executor.shutdown();
51   }
52 
53   @After
shutdownTest()54   public void shutdownTest() throws IOException {
55     proxy.shutDown();
56     server.shutDown();
57     client.close();
58   }
59 
60   @Test
smallLatency()61   public void smallLatency() throws Exception {
62     server = new Server();
63     int serverPort = server.init();
64     executor.execute(server);
65 
66     int latency = (int) TimeUnit.MILLISECONDS.toNanos(50);
67     proxy = new TrafficControlProxy(serverPort, 1024 * 1024, latency, TimeUnit.NANOSECONDS);
68     startProxy(proxy).get();
69     client = new Socket("localhost", proxy.getPort());
70     client.setReuseAddress(true);
71     DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
72     DataInputStream clientIn = new DataInputStream(client.getInputStream());
73     byte[] message = new byte[1];
74 
75     // warmup
76     for (int i = 0; i < 5; i++) {
77       clientOut.write(message, 0, 1);
78     }
79     clientIn.readFully(new byte[5]);
80 
81     // test
82     long start = System.nanoTime();
83     clientOut.write(message, 0, 1);
84     clientIn.read(message);
85     long stop = System.nanoTime();
86 
87     long rtt = (stop - start);
88     assertEquals(latency, rtt, latency);
89   }
90 
91   @Test
bigLatency()92   public void bigLatency() throws Exception {
93     server = new Server();
94     int serverPort = server.init();
95     executor.execute(server);
96 
97     int latency = (int) TimeUnit.MILLISECONDS.toNanos(250);
98     proxy = new TrafficControlProxy(serverPort, 1024 * 1024, latency, TimeUnit.NANOSECONDS);
99     startProxy(proxy).get();
100     client = new Socket("localhost", proxy.getPort());
101     DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
102     DataInputStream clientIn = new DataInputStream(client.getInputStream());
103     byte[] message = new byte[1];
104 
105     // warmup
106     for (int i = 0; i < 5; i++) {
107       clientOut.write(message, 0, 1);
108     }
109     clientIn.readFully(new byte[5]);
110 
111     // test
112     long start = System.nanoTime();
113     clientOut.write(message, 0, 1);
114     clientIn.read(message);
115     long stop = System.nanoTime();
116 
117     long rtt = (stop - start);
118     assertEquals(latency, rtt, latency);
119   }
120 
121   @Test
smallBandwidth()122   public void smallBandwidth() throws Exception {
123     server = new Server();
124     int serverPort = server.init();
125     server.setMode("stream");
126     executor.execute(server);
127     assertEquals("stream", server.mode());
128 
129     int bandwidth = 64 * 1024;
130     proxy = new TrafficControlProxy(serverPort, bandwidth, 200, TimeUnit.MILLISECONDS);
131     startProxy(proxy).get();
132     client = new Socket("localhost", proxy.getPort());
133     DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
134     DataInputStream clientIn = new DataInputStream(client.getInputStream());
135 
136     clientOut.write(new byte[1]);
137     clientIn.readFully(new byte[100 * 1024]);
138     long start = System.nanoTime();
139     clientIn.readFully(new byte[5 * bandwidth]);
140     long stop = System.nanoTime();
141 
142     long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1)));
143     assertEquals(bandwidth, bandUsed, .5 * bandwidth);
144   }
145 
146   @Test
largeBandwidth()147   public void largeBandwidth() throws Exception {
148     server = new Server();
149     int serverPort = server.init();
150     server.setMode("stream");
151     executor.execute(server);
152     assertEquals("stream", server.mode());
153     int bandwidth = 10 * 1024 * 1024;
154     proxy = new TrafficControlProxy(serverPort, bandwidth, 200, TimeUnit.MILLISECONDS);
155     startProxy(proxy).get();
156     client = new Socket("localhost", proxy.getPort());
157     DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
158     DataInputStream clientIn = new DataInputStream(client.getInputStream());
159 
160     clientOut.write(new byte[1]);
161     clientIn.readFully(new byte[100 * 1024]);
162     long start = System.nanoTime();
163     clientIn.readFully(new byte[5 * bandwidth]);
164     long stop = System.nanoTime();
165 
166     long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1)));
167     assertEquals(bandwidth, bandUsed, .5 * bandwidth);
168   }
169 
startProxy(final TrafficControlProxy p)170   private Future<?> startProxy(final TrafficControlProxy p) {
171     return executor.submit(new Runnable() {
172       @Override
173       public void run() {
174         try {
175           p.start();
176         } catch (Exception e) {
177           throw new RuntimeException(e);
178         }
179       }
180     });
181   }
182 
183   // server with echo and streaming modes
184   private static class Server implements Runnable {
185     private ServerSocket server;
186     private Socket rcv;
187     private boolean shutDown;
188     private String mode = "echo";
189 
190     public void setMode(String mode) {
191       this.mode = mode;
192     }
193 
194     public String mode() {
195       return mode;
196     }
197 
198     /**
199      * Initializes server and returns its listening port.
200      */
201     public int init() throws IOException {
202       server = new ServerSocket(0);
203       return server.getLocalPort();
204     }
205 
206     public void shutDown() {
207       try {
208         server.close();
209         rcv.close();
210         shutDown = true;
211       } catch (IOException e) {
212         shutDown = true;
213       }
214     }
215 
216     @Override
217     public void run() {
218       try {
219         rcv = server.accept();
220         DataInputStream serverIn = new DataInputStream(rcv.getInputStream());
221         DataOutputStream serverOut = new DataOutputStream(rcv.getOutputStream());
222         byte[] response = new byte[1024];
223         if (mode.equals("echo")) {
224           while (!shutDown) {
225             int readable = serverIn.read(response);
226             serverOut.write(response, 0, readable);
227           }
228         } else if (mode.equals("stream")) {
229           serverIn.read(response);
230           byte[] message = new byte[16 * 1024];
231           while (!shutDown) {
232             serverOut.write(message, 0, message.length);
233           }
234           serverIn.close();
235           serverOut.close();
236           rcv.close();
237         } else {
238           System.out.println("Unknown mode: use 'echo' or 'stream'");
239         }
240       } catch (IOException e) {
241         throw new RuntimeException(e);
242       }
243     }
244   }
245 }
246