1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static org.junit.Assert.assertEquals; 20 21 import com.google.common.io.ByteStreams; 22 import io.netty.util.concurrent.DefaultThreadFactory; 23 import java.io.DataInputStream; 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.OutputStream; 27 import java.net.ServerSocket; 28 import java.net.Socket; 29 import java.util.ArrayList; 30 import java.util.Collections; 31 import java.util.List; 32 import java.util.concurrent.LinkedBlockingQueue; 33 import java.util.concurrent.ThreadPoolExecutor; 34 import java.util.concurrent.TimeUnit; 35 import org.junit.After; 36 import org.junit.AfterClass; 37 import org.junit.Test; 38 import org.junit.runner.RunWith; 39 import org.junit.runners.JUnit4; 40 41 @RunWith(JUnit4.class) 42 public class ProxyTest { 43 44 private static ThreadPoolExecutor executor = 45 new ThreadPoolExecutor(8, 8, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 46 new DefaultThreadFactory("proxy-test-pool", true)); 47 48 private TrafficControlProxy proxy; 49 private Socket client; 50 private Server server; 51 52 @AfterClass stopExecutor()53 public static void stopExecutor() { 54 executor.shutdown(); 55 } 56 57 @After shutdownTest()58 public void shutdownTest() throws IOException { 59 proxy.shutDown(); 60 server.shutDown(); 61 client.close(); 62 } 63 64 @Test 65 @org.junit.Ignore // flaky. latency commonly too high smallLatency()66 public void smallLatency() throws Exception { 67 server = new Server(); 68 int serverPort = server.init(); 69 executor.execute(server); 70 71 int latency = (int) TimeUnit.MILLISECONDS.toNanos(50); 72 proxy = new TrafficControlProxy(serverPort, 1024 * 1024, latency, TimeUnit.NANOSECONDS); 73 proxy.start(); 74 client = new Socket("localhost", proxy.getPort()); 75 client.setReuseAddress(true); 76 OutputStream clientOut = client.getOutputStream(); 77 DataInputStream clientIn = new DataInputStream(client.getInputStream()); 78 byte[] message = new byte[1]; 79 80 // warmup 81 for (int i = 0; i < 5; i++) { 82 clientOut.write(message, 0, 1); 83 } 84 clientIn.readFully(new byte[5]); 85 86 // test 87 List<Long> rtts = new ArrayList<>(); 88 for (int i = 0; i < 3; i++) { 89 long start = System.nanoTime(); 90 clientOut.write(message, 0, 1); 91 clientIn.read(message); 92 rtts.add(System.nanoTime() - start); 93 } 94 Collections.sort(rtts); 95 long rtt = rtts.get(0); 96 assertEquals(latency, (double) rtt, .5 * latency); 97 } 98 99 @Test bigLatency()100 public void bigLatency() throws Exception { 101 server = new Server(); 102 int serverPort = server.init(); 103 executor.execute(server); 104 105 int latency = (int) TimeUnit.MILLISECONDS.toNanos(250); 106 proxy = new TrafficControlProxy(serverPort, 1024 * 1024, latency, TimeUnit.NANOSECONDS); 107 proxy.start(); 108 client = new Socket("localhost", proxy.getPort()); 109 OutputStream clientOut = client.getOutputStream(); 110 DataInputStream clientIn = new DataInputStream(client.getInputStream()); 111 byte[] message = new byte[1]; 112 113 // warmup 114 for (int i = 0; i < 5; i++) { 115 clientOut.write(message, 0, 1); 116 } 117 clientIn.readFully(new byte[5]); 118 119 // test 120 List<Long> rtts = new ArrayList<>(); 121 for (int i = 0; i < 2; i++) { 122 long start = System.nanoTime(); 123 clientOut.write(message, 0, 1); 124 clientIn.read(message); 125 rtts.add(System.nanoTime() - start); 126 } 127 Collections.sort(rtts); 128 long rtt = rtts.get(0); 129 assertEquals(latency, (double) rtt, .5 * latency); 130 } 131 132 @Test smallBandwidth()133 public void smallBandwidth() throws Exception { 134 server = new Server(); 135 int serverPort = server.init(); 136 server.setMode("stream"); 137 executor.execute(server); 138 139 int bandwidth = 64 * 1024; 140 proxy = new TrafficControlProxy(serverPort, bandwidth, 200, TimeUnit.MILLISECONDS); 141 proxy.start(); 142 client = new Socket("localhost", proxy.getPort()); 143 DataInputStream clientIn = new DataInputStream(client.getInputStream()); 144 145 clientIn.readFully(new byte[100 * 1024]); 146 int sample = bandwidth / 5; 147 List<Double> bandwidths = new ArrayList<>(); 148 for (int i = 0; i < 5; i++) { 149 long start = System.nanoTime(); 150 clientIn.readFully(new byte[sample]); 151 long duration = System.nanoTime() - start; 152 double actualBandwidth = sample / (((double) duration) / TimeUnit.SECONDS.toNanos(1)); 153 bandwidths.add(actualBandwidth); 154 } 155 Collections.sort(bandwidths); 156 double bandUsed = bandwidths.get(bandwidths.size() - 1); 157 assertEquals(bandwidth, bandUsed, .5 * bandwidth); 158 } 159 160 @Test largeBandwidth()161 public void largeBandwidth() throws Exception { 162 server = new Server(); 163 int serverPort = server.init(); 164 server.setMode("stream"); 165 executor.execute(server); 166 int bandwidth = 10 * 1024 * 1024; 167 proxy = new TrafficControlProxy(serverPort, bandwidth, 200, TimeUnit.MILLISECONDS); 168 proxy.start(); 169 client = new Socket("localhost", proxy.getPort()); 170 DataInputStream clientIn = new DataInputStream(client.getInputStream()); 171 172 clientIn.readFully(new byte[100 * 1024]); 173 int sample = bandwidth / 5; 174 List<Double> bandwidths = new ArrayList<>(); 175 for (int i = 0; i < 5; i++) { 176 long start = System.nanoTime(); 177 clientIn.readFully(new byte[sample]); 178 long duration = System.nanoTime() - start; 179 double actualBandwidth = sample / (((double) duration) / TimeUnit.SECONDS.toNanos(1)); 180 bandwidths.add(actualBandwidth); 181 } 182 Collections.sort(bandwidths); 183 double bandUsed = bandwidths.get(bandwidths.size() - 1); 184 assertEquals(bandwidth, bandUsed, .5 * bandwidth); 185 } 186 187 // server with echo and streaming modes 188 private static class Server implements Runnable { 189 private ServerSocket server; 190 private String mode = "echo"; 191 setMode(String mode)192 public void setMode(String mode) { 193 this.mode = mode; 194 } 195 196 /** 197 * Initializes server and returns its listening port. 198 */ init()199 public int init() throws IOException { 200 server = new ServerSocket(0); 201 return server.getLocalPort(); 202 } 203 shutDown()204 public void shutDown() throws IOException { 205 server.close(); 206 } 207 208 @Override run()209 public void run() { 210 try { 211 Socket rcv = server.accept(); 212 try { 213 handleSocket(rcv); 214 } finally { 215 rcv.close(); 216 } 217 } catch (IOException e) { 218 throw new RuntimeException(e); 219 } 220 } 221 handleSocket(Socket rcv)222 private void handleSocket(Socket rcv) throws IOException { 223 InputStream serverIn = rcv.getInputStream(); 224 OutputStream serverOut = rcv.getOutputStream(); 225 if (mode.equals("echo")) { 226 ByteStreams.copy(serverIn, serverOut); 227 } else if (mode.equals("stream")) { 228 byte[] message = new byte[1024]; 229 while (true) { 230 try { 231 serverOut.write(message); 232 } catch (IOException ignored) { 233 // Client closed 234 break; 235 } 236 } 237 } else { 238 throw new RuntimeException("Unknown mode: use 'echo' or 'stream'"); 239 } 240 } 241 } 242 } 243