1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static org.junit.Assert.assertEquals; 20 21 import io.netty.util.concurrent.DefaultThreadFactory; 22 import java.io.DataInputStream; 23 import java.io.DataOutputStream; 24 import java.io.IOException; 25 import java.net.ServerSocket; 26 import java.net.Socket; 27 import java.util.concurrent.Future; 28 import java.util.concurrent.LinkedBlockingQueue; 29 import java.util.concurrent.ThreadPoolExecutor; 30 import java.util.concurrent.TimeUnit; 31 import org.junit.After; 32 import org.junit.AfterClass; 33 import org.junit.Test; 34 import org.junit.runner.RunWith; 35 import org.junit.runners.JUnit4; 36 37 @RunWith(JUnit4.class) 38 public class ProxyTest { 39 40 private static ThreadPoolExecutor executor = 41 new ThreadPoolExecutor(8, 8, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 42 new DefaultThreadFactory("proxy-test-pool", true)); 43 44 private TrafficControlProxy proxy; 45 private Socket client; 46 private Server server; 47 48 @AfterClass stopExecutor()49 public static void stopExecutor() { 50 executor.shutdown(); 51 } 52 53 @After shutdownTest()54 public void shutdownTest() throws IOException { 55 proxy.shutDown(); 56 server.shutDown(); 57 client.close(); 58 } 59 60 @Test smallLatency()61 public void smallLatency() throws Exception { 62 server = new Server(); 63 int serverPort = server.init(); 64 executor.execute(server); 65 66 int latency = (int) TimeUnit.MILLISECONDS.toNanos(50); 67 proxy = new TrafficControlProxy(serverPort, 1024 * 1024, latency, TimeUnit.NANOSECONDS); 68 startProxy(proxy).get(); 69 client = new Socket("localhost", proxy.getPort()); 70 client.setReuseAddress(true); 71 DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); 72 DataInputStream clientIn = new DataInputStream(client.getInputStream()); 73 byte[] message = new byte[1]; 74 75 // warmup 76 for (int i = 0; i < 5; i++) { 77 clientOut.write(message, 0, 1); 78 } 79 clientIn.readFully(new byte[5]); 80 81 // test 82 long start = System.nanoTime(); 83 clientOut.write(message, 0, 1); 84 clientIn.read(message); 85 long stop = System.nanoTime(); 86 87 long rtt = (stop - start); 88 assertEquals(latency, rtt, latency); 89 } 90 91 @Test bigLatency()92 public void bigLatency() throws Exception { 93 server = new Server(); 94 int serverPort = server.init(); 95 executor.execute(server); 96 97 int latency = (int) TimeUnit.MILLISECONDS.toNanos(250); 98 proxy = new TrafficControlProxy(serverPort, 1024 * 1024, latency, TimeUnit.NANOSECONDS); 99 startProxy(proxy).get(); 100 client = new Socket("localhost", proxy.getPort()); 101 DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); 102 DataInputStream clientIn = new DataInputStream(client.getInputStream()); 103 byte[] message = new byte[1]; 104 105 // warmup 106 for (int i = 0; i < 5; i++) { 107 clientOut.write(message, 0, 1); 108 } 109 clientIn.readFully(new byte[5]); 110 111 // test 112 long start = System.nanoTime(); 113 clientOut.write(message, 0, 1); 114 clientIn.read(message); 115 long stop = System.nanoTime(); 116 117 long rtt = (stop - start); 118 assertEquals(latency, rtt, latency); 119 } 120 121 @Test smallBandwidth()122 public void smallBandwidth() throws Exception { 123 server = new Server(); 124 int serverPort = server.init(); 125 server.setMode("stream"); 126 executor.execute(server); 127 assertEquals("stream", server.mode()); 128 129 int bandwidth = 64 * 1024; 130 proxy = new TrafficControlProxy(serverPort, bandwidth, 200, TimeUnit.MILLISECONDS); 131 startProxy(proxy).get(); 132 client = new Socket("localhost", proxy.getPort()); 133 DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); 134 DataInputStream clientIn = new DataInputStream(client.getInputStream()); 135 136 clientOut.write(new byte[1]); 137 clientIn.readFully(new byte[100 * 1024]); 138 long start = System.nanoTime(); 139 clientIn.readFully(new byte[5 * bandwidth]); 140 long stop = System.nanoTime(); 141 142 long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1))); 143 assertEquals(bandwidth, bandUsed, .5 * bandwidth); 144 } 145 146 @Test largeBandwidth()147 public void largeBandwidth() throws Exception { 148 server = new Server(); 149 int serverPort = server.init(); 150 server.setMode("stream"); 151 executor.execute(server); 152 assertEquals("stream", server.mode()); 153 int bandwidth = 10 * 1024 * 1024; 154 proxy = new TrafficControlProxy(serverPort, bandwidth, 200, TimeUnit.MILLISECONDS); 155 startProxy(proxy).get(); 156 client = new Socket("localhost", proxy.getPort()); 157 DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); 158 DataInputStream clientIn = new DataInputStream(client.getInputStream()); 159 160 clientOut.write(new byte[1]); 161 clientIn.readFully(new byte[100 * 1024]); 162 long start = System.nanoTime(); 163 clientIn.readFully(new byte[5 * bandwidth]); 164 long stop = System.nanoTime(); 165 166 long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1))); 167 assertEquals(bandwidth, bandUsed, .5 * bandwidth); 168 } 169 startProxy(final TrafficControlProxy p)170 private Future<?> startProxy(final TrafficControlProxy p) { 171 return executor.submit(new Runnable() { 172 @Override 173 public void run() { 174 try { 175 p.start(); 176 } catch (Exception e) { 177 throw new RuntimeException(e); 178 } 179 } 180 }); 181 } 182 183 // server with echo and streaming modes 184 private static class Server implements Runnable { 185 private ServerSocket server; 186 private Socket rcv; 187 private boolean shutDown; 188 private String mode = "echo"; 189 190 public void setMode(String mode) { 191 this.mode = mode; 192 } 193 194 public String mode() { 195 return mode; 196 } 197 198 /** 199 * Initializes server and returns its listening port. 200 */ 201 public int init() throws IOException { 202 server = new ServerSocket(0); 203 return server.getLocalPort(); 204 } 205 206 public void shutDown() { 207 try { 208 server.close(); 209 rcv.close(); 210 shutDown = true; 211 } catch (IOException e) { 212 shutDown = true; 213 } 214 } 215 216 @Override 217 public void run() { 218 try { 219 rcv = server.accept(); 220 DataInputStream serverIn = new DataInputStream(rcv.getInputStream()); 221 DataOutputStream serverOut = new DataOutputStream(rcv.getOutputStream()); 222 byte[] response = new byte[1024]; 223 if (mode.equals("echo")) { 224 while (!shutDown) { 225 int readable = serverIn.read(response); 226 serverOut.write(response, 0, readable); 227 } 228 } else if (mode.equals("stream")) { 229 serverIn.read(response); 230 byte[] message = new byte[16 * 1024]; 231 while (!shutDown) { 232 serverOut.write(message, 0, message.length); 233 } 234 serverIn.close(); 235 serverOut.close(); 236 rcv.close(); 237 } else { 238 System.out.println("Unknown mode: use 'echo' or 'stream'"); 239 } 240 } catch (IOException e) { 241 throw new RuntimeException(e); 242 } 243 } 244 } 245 } 246