• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static org.junit.Assert.assertEquals;
20 
21 import com.google.common.io.ByteStreams;
22 import io.netty.util.concurrent.DefaultThreadFactory;
23 import java.io.DataInputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.net.ServerSocket;
28 import java.net.Socket;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35 import org.junit.After;
36 import org.junit.AfterClass;
37 import org.junit.Test;
38 import org.junit.runner.RunWith;
39 import org.junit.runners.JUnit4;
40 
41 @RunWith(JUnit4.class)
42 public class ProxyTest {
43 
44   private static ThreadPoolExecutor executor =
45       new ThreadPoolExecutor(8, 8, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
46           new DefaultThreadFactory("proxy-test-pool", true));
47 
48   private TrafficControlProxy proxy;
49   private Socket client;
50   private Server server;
51 
52   @AfterClass
stopExecutor()53   public static void stopExecutor() {
54     executor.shutdown();
55   }
56 
57   @After
shutdownTest()58   public void shutdownTest() throws IOException {
59     proxy.shutDown();
60     server.shutDown();
61     client.close();
62   }
63 
64   @Test
65   @org.junit.Ignore // flaky. latency commonly too high
smallLatency()66   public void smallLatency() throws Exception {
67     server = new Server();
68     int serverPort = server.init();
69     executor.execute(server);
70 
71     int latency = (int) TimeUnit.MILLISECONDS.toNanos(50);
72     proxy = new TrafficControlProxy(serverPort, 1024 * 1024, latency, TimeUnit.NANOSECONDS);
73     proxy.start();
74     client = new Socket("localhost", proxy.getPort());
75     client.setReuseAddress(true);
76     OutputStream clientOut = client.getOutputStream();
77     DataInputStream clientIn = new DataInputStream(client.getInputStream());
78     byte[] message = new byte[1];
79 
80     // warmup
81     for (int i = 0; i < 5; i++) {
82       clientOut.write(message, 0, 1);
83     }
84     clientIn.readFully(new byte[5]);
85 
86     // test
87     List<Long> rtts = new ArrayList<>();
88     for (int i = 0; i < 3; i++) {
89       long start = System.nanoTime();
90       clientOut.write(message, 0, 1);
91       clientIn.read(message);
92       rtts.add(System.nanoTime() - start);
93     }
94     Collections.sort(rtts);
95     long rtt = rtts.get(0);
96     assertEquals(latency, (double) rtt, .5 * latency);
97   }
98 
99   @Test
bigLatency()100   public void bigLatency() throws Exception {
101     server = new Server();
102     int serverPort = server.init();
103     executor.execute(server);
104 
105     int latency = (int) TimeUnit.MILLISECONDS.toNanos(250);
106     proxy = new TrafficControlProxy(serverPort, 1024 * 1024, latency, TimeUnit.NANOSECONDS);
107     proxy.start();
108     client = new Socket("localhost", proxy.getPort());
109     OutputStream clientOut = client.getOutputStream();
110     DataInputStream clientIn = new DataInputStream(client.getInputStream());
111     byte[] message = new byte[1];
112 
113     // warmup
114     for (int i = 0; i < 5; i++) {
115       clientOut.write(message, 0, 1);
116     }
117     clientIn.readFully(new byte[5]);
118 
119     // test
120     List<Long> rtts = new ArrayList<>();
121     for (int i = 0; i < 2; i++) {
122       long start = System.nanoTime();
123       clientOut.write(message, 0, 1);
124       clientIn.read(message);
125       rtts.add(System.nanoTime() - start);
126     }
127     Collections.sort(rtts);
128     long rtt = rtts.get(0);
129     assertEquals(latency, (double) rtt, .5 * latency);
130   }
131 
132   @Test
smallBandwidth()133   public void smallBandwidth() throws Exception {
134     server = new Server();
135     int serverPort = server.init();
136     server.setMode("stream");
137     executor.execute(server);
138 
139     int bandwidth = 64 * 1024;
140     proxy = new TrafficControlProxy(serverPort, bandwidth, 200, TimeUnit.MILLISECONDS);
141     proxy.start();
142     client = new Socket("localhost", proxy.getPort());
143     DataInputStream clientIn = new DataInputStream(client.getInputStream());
144 
145     clientIn.readFully(new byte[100 * 1024]);
146     int sample = bandwidth / 5;
147     List<Double> bandwidths = new ArrayList<>();
148     for (int i = 0; i < 5; i++) {
149       long start = System.nanoTime();
150       clientIn.readFully(new byte[sample]);
151       long duration = System.nanoTime() - start;
152       double actualBandwidth = sample / (((double) duration) / TimeUnit.SECONDS.toNanos(1));
153       bandwidths.add(actualBandwidth);
154     }
155     Collections.sort(bandwidths);
156     double bandUsed = bandwidths.get(bandwidths.size() - 1);
157     assertEquals(bandwidth, bandUsed, .5 * bandwidth);
158   }
159 
160   @Test
largeBandwidth()161   public void largeBandwidth() throws Exception {
162     server = new Server();
163     int serverPort = server.init();
164     server.setMode("stream");
165     executor.execute(server);
166     int bandwidth = 10 * 1024 * 1024;
167     proxy = new TrafficControlProxy(serverPort, bandwidth, 200, TimeUnit.MILLISECONDS);
168     proxy.start();
169     client = new Socket("localhost", proxy.getPort());
170     DataInputStream clientIn = new DataInputStream(client.getInputStream());
171 
172     clientIn.readFully(new byte[100 * 1024]);
173     int sample = bandwidth / 5;
174     List<Double> bandwidths = new ArrayList<>();
175     for (int i = 0; i < 5; i++) {
176       long start = System.nanoTime();
177       clientIn.readFully(new byte[sample]);
178       long duration = System.nanoTime() - start;
179       double actualBandwidth = sample / (((double) duration) / TimeUnit.SECONDS.toNanos(1));
180       bandwidths.add(actualBandwidth);
181     }
182     Collections.sort(bandwidths);
183     double bandUsed = bandwidths.get(bandwidths.size() - 1);
184     assertEquals(bandwidth, bandUsed, .5 * bandwidth);
185   }
186 
187   // server with echo and streaming modes
188   private static class Server implements Runnable {
189     private ServerSocket server;
190     private String mode = "echo";
191 
setMode(String mode)192     public void setMode(String mode) {
193       this.mode = mode;
194     }
195 
196     /**
197      * Initializes server and returns its listening port.
198      */
init()199     public int init() throws IOException {
200       server = new ServerSocket(0);
201       return server.getLocalPort();
202     }
203 
shutDown()204     public void shutDown() throws IOException {
205       server.close();
206     }
207 
208     @Override
run()209     public void run() {
210       try {
211         Socket rcv = server.accept();
212         try {
213           handleSocket(rcv);
214         } finally {
215           rcv.close();
216         }
217       } catch (IOException e) {
218         throw new RuntimeException(e);
219       }
220     }
221 
handleSocket(Socket rcv)222     private void handleSocket(Socket rcv) throws IOException {
223       InputStream serverIn = rcv.getInputStream();
224       OutputStream serverOut = rcv.getOutputStream();
225       if (mode.equals("echo")) {
226         ByteStreams.copy(serverIn, serverOut);
227       } else if (mode.equals("stream")) {
228         byte[] message = new byte[1024];
229         while (true) {
230           try {
231             serverOut.write(message);
232           } catch (IOException ignored) {
233             // Client closed
234             break;
235           }
236         }
237       } else {
238         throw new RuntimeException("Unknown mode: use 'echo' or 'stream'");
239       }
240     }
241   }
242 }
243